Source: libs/change_streams.js

  1. /**
  2. * Change Streams library.
  3. */
  4. /*global
  5. assert CountDownLatch Date db Error Mongo print ScopedThread sharded_cluster sleep tojson
  6. */
  7. /**
  8. * Creates a ChangeStreamListenerThread.
  9. *
  10. * @constructor
  11. * @param {String} dbName - The database name.
  12. * @param {String} collName - The collection name.
  13. * @param {Array} filterPipeline - The aggregation pipeline that should follow the '$changeStream'
  14. * aggregation stage.
  15. * @param {Object} changeStreamOptions - The options that should be passed to the '$changeStream'
  16. * aggregation stage. Defaults to no options.
  17. * @param {Function} changeHandler - A callback function that takes a change document, a state
  18. * object and an options object as parameters. The function will be called for each change
  19. * document read. Defaults to a noop function.
  20. * @param {Object} changeHandlerOptions - An object that will be passed as argument to the
  21. * changeHandler callback function. Defaults to an empty object.
  22. * @param {String} targetHost - The host to connect to in order to listen for the change stream.
  23. * Defaults to the the default server (using the global db).
  24. * @param {String} readPreferenceMode - The read preference of the change stream aggregation pipeline.
  25. */
  26. var ChangeStreamListenerThread = function(dbName, collName, filterPipeline, changeStreamOptions,
  27. changeHandler, changeHandlerOptions, targetHost, readPreferenceMode) {
  28. var _continueCounter = null;
  29. var _notReadyCounter = null;
  30. var _thread = null;
  31. var _filterPipeline = filterPipeline || [];
  32. var _changeStreamOptions = changeStreamOptions || {};
  33. var _changeHandler = changeHandler || function (doc) {};
  34. var _changeHandlerOptions = changeHandlerOptions || {};
  35. /**
  36. * Create a change stream cursor and continually read from it.
  37. * This function is executed in a ScopedThread.
  38. *
  39. * The 'continueCounter' parameter is a CountDownLatch used by the main thread to signal this
  40. * function, which is executed in a ScopedThread, if it can continue or should stop. Its count
  41. * must be set to 1 to indicate that the function can continue, and to 0 to indicate that it
  42. * should stop.
  43. * The 'notReadyCounter' parameter is a CountDownLatch used by this function to signal the main
  44. * thread that it is ready and reading change streams. It must be passed to this function with
  45. * a count of 1, and the function will set it to 0 when ready.
  46. */
  47. function _readChangeStream(continueCounter, notReadyCounter, dbName, collName, filterPipeline,
  48. changeStreamOptions, changeHandler, changeHandlerOptions,
  49. targetHost, username, password, authEnabled, readPreferenceMode) {
  50. try {
  51. assert.eq(continueCounter.getCount(), 1,
  52. "The continueCounter must have an initial value of 1.");
  53. assert.eq(notReadyCounter.getCount(), 1,
  54. "The notReadyCounter must have an initial value of 1.");
  55. var state = {
  56. "value": 0,
  57. "clusterTime": null
  58. };
  59. var pipeline = [{"$changeStream": changeStreamOptions}].concat(filterPipeline);
  60. // Initialize the db object.
  61. var _db;
  62. if (targetHost) {
  63. // Using a connection to the provided 'targetHost'.
  64. // This is used when we need to connect to a specific mongos.
  65. if (authEnabled){
  66. var targetHostAuth = 'mongodb://'.concat(username, ':', password, '@', targetHost);
  67. } else {
  68. var targetHostAuth = targetHost;
  69. }
  70. var conn = new Mongo(targetHostAuth);
  71. _db = conn.getDB(dbName);
  72. } else {
  73. // Using the default connection configured by run_workloads.py.
  74. _db = db.getSiblingDB(dbName);
  75. }
  76. // Function to execute a command and update the state.clusterTime variable with
  77. // the cluster time found in the response.
  78. var runCommand = function(cmdObj) {
  79. var res = assert.commandWorked(_db.runCommand(cmdObj));
  80. state.clusterTime = res.$clusterTime.clusterTime.getTime();
  81. return res;
  82. };
  83. // Run the aggregate command.
  84. var res = runCommand({aggregate: collName,
  85. pipeline: pipeline,
  86. cursor: {},
  87. $readPreference: {mode: readPreferenceMode}});
  88. var cursorId = res.cursor.id;
  89. var batch = res.cursor.firstBatch;
  90. // Set up the function that will be called for each document.
  91. var callback = function(doc) {
  92. changeHandler(doc, state, changeHandlerOptions);
  93. };
  94. // The aggregation command returned sucessfully. This thread is ready.
  95. notReadyCounter.countDown();
  96. // While the thread is not stopped, we go through each batch of result, call the
  97. // callback function for each document, and fetch the next batch with a 'getMore'
  98. // command.
  99. // Note that we don't wait for all the changes to be read as for some configurations
  100. // it can take several minutes. This can be revised when the performances improve.
  101. while (continueCounter.getCount() > 0) {
  102. batch.forEach(callback);
  103. res = runCommand({getMore: cursorId, collection: collName});
  104. batch = res.cursor.nextBatch;
  105. }
  106. return state.value;
  107. } catch (e) {
  108. print("ChangeStreamListenerThread interrupted by error");
  109. // When the error is raised by assert.commandWorked then it is already logged so we
  110. // don't print all the details here.
  111. if (!("codeName" in e)) {
  112. print(tojson(e));
  113. }
  114. return {"error": true, "message": tojson(e)};
  115. }
  116. }
  117. /**
  118. * Returns true if the listener thread has been created and started.
  119. */
  120. this.hasStarted = function() {
  121. // Double ! to return true if '_thread' is truthy or false if it is falsy.
  122. return !!_thread;
  123. };
  124. this.isReady = function() {
  125. return _notReadyCounter.getCount() === 0;
  126. };
  127. /**
  128. * Spawns a ScopedThread that will run _readChangeStream().
  129. */
  130. this.start = function() {
  131. if (_thread) {
  132. throw new Error("Listener thread is already active.");
  133. }
  134. _continueCounter = CountDownLatch(1);
  135. _notReadyCounter = CountDownLatch(1);
  136. _thread = new ScopedThread(_readChangeStream, _continueCounter, _notReadyCounter,
  137. dbName, collName, _filterPipeline,
  138. _changeStreamOptions, _changeHandler, _changeHandlerOptions,
  139. targetHost, username, password, authEnabled, readPreferenceMode);
  140. _thread.start();
  141. };
  142. /** Stops the thread. */
  143. this.stop = function() {
  144. if (!_thread) {
  145. throw new Error("Listener thread is not active.");
  146. }
  147. _continueCounter.countDown();
  148. _continueCounter = null;
  149. };
  150. /** Joins the thread. */
  151. this.join = function() {
  152. if (!_thread) {
  153. throw new Error("Listener thread is not active.");
  154. }
  155. if (_continueCounter) {
  156. throw new Error("Listener thread has not been stopped");
  157. }
  158. _thread.join();
  159. this.returnData = _thread.returnData();
  160. _thread = null;
  161. };
  162. };
  163. /**
  164. * Starts change stream listeners.
  165. *
  166. * @param {Number} nbListeners - The number of change stream listeners to start.
  167. * @param {String} dbName - The database name.
  168. * @param {String} collName - The collection name.
  169. * @param {Array} filterPipeline - The aggregation pipeline stages to append to the '$changeStream'
  170. * stage.
  171. * @param {Object} changeStreamOptions - The options to pass to the '$changeStream' stage.
  172. * @param {Function} changeHandler - A callback function that takes a change document and a state
  173. * object as parameters. The function will be called for each change document read.
  174. * @param {Object} changeHandlerOptions - An object that will be passed as argument to the
  175. * changeHandler callback function. Defaults to an empty object.
  176. * @param {Array} mongoHosts - An array of mongos hosts to connect to. The listeners are assigned to
  177. * each host sequentially (round robin). Optional.
  178. * @param {String} readPreferenceMode - The read preference of the change stream aggregation pipeline.
  179. * @return {Array} The created listeners.
  180. */
  181. function startListeners(nbListeners, dbName, collName, filterPipeline,
  182. changeStreamOptions, changeHandler, changeHandlerOptions, mongosHosts, targetHost, readPreferenceMode)
  183. {
  184. if (mongosHosts && mongosHosts.length !== 0) {
  185. print("Starting " + nbListeners + " threads split on targets: " + tojson(mongosHosts));
  186. } else {
  187. print("Starting " + nbListeners + " threads");
  188. }
  189. var listeners = [];
  190. for (var i = 0; i < nbListeners; i++) {
  191. if (mongosHosts) {
  192. targetHost = mongosHosts[i % mongosHosts.length];
  193. }
  194. listener = new ChangeStreamListenerThread(dbName, collName, filterPipeline,
  195. changeStreamOptions, changeHandler,
  196. changeHandlerOptions, targetHost, readPreferenceMode);
  197. listener.start();
  198. listeners.push(listener);
  199. }
  200. return listeners;
  201. }
  202. /**
  203. * Waits for the listeners to be ready.
  204. *
  205. * @param {Array} listeners - The listeners as an array of ChangeStreamListenerThreads.
  206. */
  207. function waitForListeners(listeners) {
  208. var start = new Date().getTime();
  209. // Timeout is 5s + 100ms * nb of listeners.
  210. // It is an arbitrary value aimed at giving enough time for all the listeners to start so
  211. // that when we hit a timeout it is clear something went wrong.
  212. var end_timeout = start + 5000 + listeners.length * 100;
  213. do {
  214. var allReady = true;
  215. for (var i = 0; i < listeners.length; i++) {
  216. if (!listeners[i].isReady()) {
  217. allReady = false;
  218. sleep(50);
  219. break;
  220. }
  221. }
  222. if (allReady) {
  223. return;
  224. }
  225. } while (new Date().getTime() < end_timeout);
  226. throw new Error("Timeout while waiting to for change stream listeners to be ready.");
  227. }
  228. /**
  229. * Starts and waits for the change stream listeners to be ready.
  230. *
  231. * @param {String} dbName - The database name.
  232. * @param {Array} collNames - An array of collection names on which to listen for change streams.
  233. * @param {Number} nListeners - The number of change streams listeners to start. Should be a
  234. * multiple of the number of collections.
  235. * @param {Object} changeStreamOptions - Options for the '$changeStream' stage.
  236. * @param {Boolean} selectiveChange - Whether additional aggregation stages should be added
  237. * after the '$changeStream' stage.
  238. * @return {Object} A dictionary mapping collection names to arrays of corresponding listeners.
  239. */
  240. function startAndWaitForListeners(dbName, collNames, nListeners, changeStreamOptions, selectiveChange,
  241. changeHandler, changeHandlerOptions, mongosHosts, targetHost, readPreferenceMode) {
  242. print("Starting the change stream listeners");
  243. var listeners = {};
  244. var collName = null;
  245. var filterPipeline = getFilterPipeline(selectiveChange);
  246. var nCollListeners = nListeners / collNames.length;
  247. for (var i = 0; i < collNames.length; i++) {
  248. collName = collNames[i];
  249. listeners[collName] = startListeners(nCollListeners, dbName, collName, filterPipeline,
  250. changeStreamOptions, changeHandler, changeHandlerOptions,
  251. mongosHosts, targetHost, readPreferenceMode);
  252. }
  253. print("The listeners are started");
  254. print("Waiting for the listeners to be ready");
  255. for (collName in listeners) {
  256. waitForListeners(listeners[collName]);
  257. }
  258. print("All the listeners are ready");
  259. return listeners;
  260. }
  261. /**
  262. * Stops the change stream listeners.
  263. *
  264. * @param {Array} listeners - An array of ChangeStreamListenerThread objects.
  265. * @return {Array} An array containing the final state values for all the listeners.
  266. */
  267. function stopListeners(listeners) {
  268. var states = [];
  269. var listener;
  270. for (var i = 0; i < listeners.length; i++) {
  271. listeners[i].stop();
  272. }
  273. for (var j = 0; j < listeners.length; j++) {
  274. listener = listeners[j];
  275. listener.join();
  276. if (listener.returnData.error) {
  277. throw new Error("A change stream listener failed with error: '" +
  278. listener.returnData.message + "'");
  279. }
  280. states.push(listener.returnData);
  281. }
  282. return states;
  283. }
  284. /**********************************************************************
  285. * Helper functions to handle the change streams tests configuration. *
  286. **********************************************************************/
  287. /**
  288. * Transforms a mongos host list as found infrastructure.out.yml into
  289. * a list of host strings that can be used to create a new Mongo connection.
  290. */
  291. function getMongosHosts(mongosHostsConfig) {
  292. var mongos_targets = [];
  293. for (var i = 0; i < mongosHostsConfig.length; i++) {
  294. mongos_targets.push(mongosHostsConfig[i].private_ip);
  295. }
  296. return mongos_targets;
  297. }
  298. /**
  299. * Generates and returns a list of collection names.
  300. *
  301. * @param {String} baseName - The collections base name.
  302. * @param {Number} nCollections - The number of collection names to generate.
  303. * @return {Array} An array of collection names.
  304. */
  305. function generateCollectionNames(baseName, nCollections) {
  306. var names = [];
  307. for (var i = 0; i < nCollections; i++) {
  308. names.push(baseName + "_" + i);
  309. }
  310. return names;
  311. }
  312. /**
  313. * Gets the filter pipeline that should follow the '$changeStream' aggregation stage.
  314. *
  315. * @param {bool} selectiveChange - The value of the 'selective_change' option for the test.
  316. * @return {Array} The filter pipeline to add to the change stream pipeline.
  317. */
  318. function getFilterPipeline(selectiveChange) {
  319. if (selectiveChange) {
  320. return [{"$match": {"a": {"$lt": 100000}}}];
  321. } else {
  322. return [];
  323. }
  324. }
  325. /**
  326. * Indicates if the servers use a resume token with the BinData format.
  327. * This format is used before 3.7.4 and also when featureCompatibilityVersion is 3.6
  328. * (change streams are only supported since 3.6).
  329. */
  330. function useBinDataResumeToken() {
  331. var version = db.version().split(".");
  332. if (version < [3, 7, 4]) {
  333. return true;
  334. }
  335. if (!sharded_cluster()) {
  336. // Mongos instances do not give access to the featureCompatibilityVersion.
  337. // On a sharded cluster we currently assume the FCV is not set to 3.6.
  338. // PERF-1447 was filed to handle FCV on sharded clusters in the future.
  339. fcvRes = db.adminCommand({ getParameter: 1, featureCompatibilityVersion: 1});
  340. fcv = fcvRes.featureCompatibilityVersion.version;
  341. if (fcv == "3.6") {
  342. return true;
  343. }
  344. }
  345. return false;
  346. }
  347. /**********************************************************************
  348. * Other common functions shared by the change stream test workloads. *
  349. **********************************************************************/
  350. /**
  351. * Gets the base name for a test given its configuration options.
  352. *
  353. * @param {Number} nListeners - The number of change stream listeners.
  354. * @param {String} fullDocumentBeforeChange - The "fullDocumentBeforeChange" change stream option.
  355. * @param {String} fullDocument - The "fullDocument" change stream option.
  356. * @param {Boolean} selectiveChange - Whether the 'selective_change' option is set.
  357. * @param {Number} nCollections - The number of collections used by the test.
  358. * @param {Number} docSize - The size of the documents inserted into the collection.
  359. * @param {Boolean} preImageEnabled - Whether recording of the pre-images is enabled.
  360. * @param {String} readPreferenceMode - The read preference of the change stream aggregation pipeline.
  361. * @return {String} The test base name.
  362. */
  363. function getTestBaseName(nListeners, fullDocumentBeforeChange, fullDocument, selectiveChange, nCollections, docSize, preImageEnabled, readPreferenceMode) {
  364. var baseName = `${nListeners}`;
  365. if (fullDocumentBeforeChange) {
  366. baseName += `_${fullDocumentBeforeChange}fdbc`;
  367. }
  368. if (fullDocument) {
  369. if (fullDocument == "updateLookup") {
  370. // Previously, change stream tests that had fullDocument being set to "updateLookup",
  371. // were having a prefix of "_lookup". The newly added prefix is omitted for this
  372. // fullDocument parameter in order to preserve the historical data.
  373. baseName += "_lookup";
  374. } else {
  375. baseName += `_${fullDocument}fd`;
  376. }
  377. }
  378. if (selectiveChange) {
  379. baseName += "_filter";
  380. }
  381. if (preImageEnabled) {
  382. baseName += "_preImage";
  383. }
  384. baseName += `_${nCollections}c`;
  385. // Previously, change stream tests were only executed with document size of 100 bytes. The newly
  386. // added prefix is omitted for this document size in order to preserve the historical data.
  387. if (docSize != 100) {
  388. baseName += `_${docSize}d`;
  389. }
  390. if (readPreferenceMode != "primary") {
  391. baseName += `_${readPreferenceMode}`;
  392. }
  393. return baseName;
  394. }
  395. /**
  396. * Returns the change stream options object for the specified configuration.
  397. *
  398. * @param {String} fullDocumentBeforeChange - The 'fullDocumentBeforeChange' parameter of the $changeStream stage.
  399. * @param {String} fullDocument - The 'fullDocument' parameter of the $changeStream stage.
  400. */
  401. function getChangeStreamOptions(fullDocumentBeforeChange, fullDocument) {
  402. let result = {};
  403. if (fullDocumentBeforeChange != undefined) {
  404. result["fullDocumentBeforeChange"] = fullDocumentBeforeChange;
  405. }
  406. if (fullDocument != undefined) {
  407. result["fullDocument"] = fullDocument;
  408. }
  409. return result;
  410. }
  411. /**
  412. * Returns true if the FCV is 6.1 or above (when the feature flag is removed) or if feature flag
  413. * 'featureFlagClusterWideConfig' is enabled with FCV 6.0, false otherwise.
  414. */
  415. function isClusterWideConfigFeatureAvailable(db) {
  416. const getParam = db.adminCommand({getParameter: 1, featureFlagClusterWideConfig: 1});
  417. // On replica sets, the FCV needs to be checked so that the last LTS/continuous FCV replset variants
  418. // are accounted for. On sharded clusters, the FCV cannot be retrieved directly from the mongos
  419. // so we use the binary version instead.
  420. if (!sharded_cluster()) {
  421. const fcv = db.adminCommand({getParameter: 1, featureCompatibilityVersion: 1});
  422. const majorFCV = fcv.featureCompatibilityVersion.version.split('.')[0];
  423. const minorFCV = fcv.featureCompatibilityVersion.version.split('.')[1];
  424. return (majorFCV > 6 || (majorFCV == 6 && minorFCV >= 1)) || (getParam.hasOwnProperty("featureFlagClusterWideConfig") &&
  425. (getParam.featureFlagClusterWideConfig.value) && (majorFCV == 6) && (minorFCV == 0));
  426. } else {
  427. const majorVersion = db.version().split('.')[0];
  428. const minorVersion = db.version().split('.')[1];
  429. return (majorVersion > 6 || (majorVersion == 6 && minorVersion >= 1)) || (getParam.hasOwnProperty("featureFlagClusterWideConfig") &&
  430. getParam.featureFlagClusterWideConfig.value);
  431. }
  432. }
  433. function testChangeStreamsCRUDWorkload(db,
  434. testName,
  435. { dbName, collNames, docSize, nCollections, nThreads, nListeners, nDocs, retryableWrites, mongosHosts, shardCollections, preImagesEnabled, changeStreamOptions, fullDocumentBeforeChange, fullDocument, selectiveChange, changeHandlerOptions, testDurationSecs, changeStreamReadPreference },
  436. targetHost,
  437. changeHandler,
  438. onStopCallback,
  439. writeOnlyWorkload) {
  440. // Set the number of documents to be inserted before running the workload.
  441. var nDocuments = nDocs != undefined ? nDocs : 10000000 / docSize;
  442. var workload = new MixedCRUDWorkload(dbName, collNames, nThreads, retryableWrites,
  443. mongosHosts, docSize, nDocuments, shardCollections, writeOnlyWorkload);
  444. // Note: the workload initialization calls quiesce at the end.
  445. workload.initialize();
  446. if (changeStreamOptions && !isClusterWideConfigFeatureAvailable(db)) {
  447. jsTestLog("This test requires the time-based change stream pre-/post-image retention policy to be available. Skipping test.");
  448. return;
  449. }
  450. if (preImagesEnabled) {
  451. for (var i = 0; i < collNames.length; i++) {
  452. assert.commandWorked(db.getSiblingDB(dbName).runCommand({collMod: collNames[i], changeStreamPreAndPostImages: {enabled: true}}));
  453. }
  454. }
  455. if (isClusterWideConfigFeatureAvailable(db)) {
  456. var changeStreamOptions = changeStreamOptions ? changeStreamOptions : {preAndPostImages: {expireAfterSeconds: "off"}};
  457. assert.commandWorked(db.getSiblingDB("admin").runCommand(Object.merge({setClusterParameter: {changeStreamOptions: changeStreamOptions}})));
  458. }
  459. jsTest.log("Change streams CRUD " + testName + ": nThreads=" + nThreads + " nListeners=" + nListeners + " preImagesEnabled=" + preImagesEnabled + " docSize=" + docSize + " nCollections=" + nCollections + " shardCollections=" + shardCollections + " " + new Date());
  460. var changeStreamOptions = getChangeStreamOptions(fullDocumentBeforeChange, fullDocument);
  461. var listeners = startAndWaitForListeners(dbName, collNames, nListeners, changeStreamOptions,
  462. selectiveChange, changeHandler,
  463. changeHandlerOptions, mongosHosts, targetHost, changeStreamReadPreference);
  464. print("Starting the workload");
  465. workload.start();
  466. sleep(testDurationSecs*1000);
  467. print("Finishing the workload");
  468. var throughput = workload.stop();
  469. onStopCallback(throughput, listeners, getTestBaseName(nListeners, fullDocumentBeforeChange, fullDocument, selectiveChange, nCollections, docSize, preImagesEnabled, changeStreamReadPreference));
  470. }