Source: libs/change_streams.js

/**
 * Change Streams library.
 */

/*global
 assert CountDownLatch Date db Error Mongo print ScopedThread sharded_cluster sleep tojson
*/


/**
 * Creates a ChangeStreamListenerThread.
 *
 * @constructor
 * @param {String} dbName - The database name.
 * @param {String} collName - The collection name.
 * @param {Array} filterPipeline - The aggregation pipeline that should follow the '$changeStream'
 *        aggregation stage.
 * @param {Object} changeStreamOptions - The options that should be passed to the '$changeStream'
 *        aggregation stage. Defaults to no options.
 * @param {Function} changeHandler - A callback function that takes a change document, a state
 *        object and an options object as parameters. The function will be called for each change
 *        document read. Defaults to a noop function.
 * @param {Object} changeHandlerOptions - An object that will be passed as argument to the
 *        changeHandler callback function. Defaults to an empty object.
 * @param {String} targetHost - The host to connect to in order to listen for the change stream.
 *        Defaults to the the default server (using the global db).
 * @param {String} readPreferenceMode - The read preference of the change stream aggregation pipeline.
 */
var ChangeStreamListenerThread = function(dbName, collName, filterPipeline, changeStreamOptions,
                                          changeHandler, changeHandlerOptions, targetHost, readPreferenceMode) {
    var _continueCounter = null;
    var _notReadyCounter = null;
    var _thread = null;
    var _filterPipeline = filterPipeline || [];
    var _changeStreamOptions = changeStreamOptions || {};
    var _changeHandler = changeHandler || function (doc) {};
    var _changeHandlerOptions = changeHandlerOptions || {};

    /**
     * Create a change stream cursor and continually read from it.
     * This function is executed in a ScopedThread.
     *
     * The 'continueCounter' parameter is a CountDownLatch used by the main thread to signal this
     * function, which is executed in a ScopedThread, if it can continue or should stop. Its count
     * must be set to 1 to indicate that the function can continue, and to 0 to indicate that it
     * should stop.
     * The 'notReadyCounter' parameter is a CountDownLatch used by this function to signal the main
     * thread that it is ready and reading change streams. It must be passed to this function with
     * a count of 1, and the function will set it to 0 when ready.
     */
    function _readChangeStream(continueCounter, notReadyCounter, dbName, collName, filterPipeline,
                               changeStreamOptions, changeHandler, changeHandlerOptions,
                               targetHost, username, password, authEnabled, readPreferenceMode) {
        try {
            assert.eq(continueCounter.getCount(), 1,
                      "The continueCounter must have an initial value of 1.");
            assert.eq(notReadyCounter.getCount(), 1,
                      "The notReadyCounter must have an initial value of 1.");
            var state = {
                "value": 0,
                "clusterTime": null
            };
            var pipeline = [{"$changeStream": changeStreamOptions}].concat(filterPipeline);

            // Initialize the db object.
            var _db;
            if (targetHost) {
                // Using a connection to the provided 'targetHost'.
                // This is used when we need to connect to a specific mongos.
                if (authEnabled){
                    var targetHostAuth = 'mongodb://'.concat(username, ':', password, '@', targetHost);
                } else {
                    var targetHostAuth = targetHost;
                }
                var conn = new Mongo(targetHostAuth);
                _db = conn.getDB(dbName);
            } else {
                // Using the default connection configured by run_workloads.py.
                _db = db.getSiblingDB(dbName);
            }

            // Function to execute a command and update the state.clusterTime variable with
            // the cluster time found in the response.
            var runCommand = function(cmdObj) {
                var res = assert.commandWorked(_db.runCommand(cmdObj));
                state.clusterTime = res.$clusterTime.clusterTime.getTime();
                return res;
            };

            // Run the aggregate command.
            var res = runCommand({aggregate: collName,
                                  pipeline: pipeline,
                                  cursor: {},
                                  $readPreference: {mode: readPreferenceMode}});
            var cursorId = res.cursor.id;
            var batch = res.cursor.firstBatch;

            // Set up the function that will be called for each document.
            var callback = function(doc) {
                changeHandler(doc, state, changeHandlerOptions);
            };

            // The aggregation command returned sucessfully. This thread is ready.
            notReadyCounter.countDown();
            // While the thread is not stopped, we go through each batch of result, call the
            // callback function for each document, and fetch the next batch with a 'getMore'
            // command.
            // Note that we don't wait for all the changes to be read as for some configurations
            // it can take several minutes. This can be revised when the performances improve.
            while (continueCounter.getCount() > 0) {
                batch.forEach(callback);
                res = runCommand({getMore: cursorId, collection: collName});
                batch = res.cursor.nextBatch;
            }
            return state.value;
        } catch (e) {
            print("ChangeStreamListenerThread interrupted by error");
            // When the error is raised by assert.commandWorked then it is already logged so we
            // don't print all the details here.
            if (!("codeName" in e)) {
                print(tojson(e));
            }
            return {"error": true, "message": tojson(e)};
        }
    }

    /**
     * Returns true if the listener thread has been created and started.
     */
    this.hasStarted = function() {
        // Double ! to return true if '_thread' is truthy or false if it is falsy.
        return !!_thread;
    };

    this.isReady = function() {
        return _notReadyCounter.getCount() === 0;
    };

    /**
     * Spawns a ScopedThread that will run _readChangeStream().
     */
    this.start = function() {
        if (_thread) {
            throw new Error("Listener thread is already active.");
        }

        _continueCounter = CountDownLatch(1);
        _notReadyCounter = CountDownLatch(1);
        _thread = new ScopedThread(_readChangeStream, _continueCounter, _notReadyCounter,
                                   dbName, collName, _filterPipeline,
                                   _changeStreamOptions, _changeHandler, _changeHandlerOptions,
                                   targetHost, username, password, authEnabled, readPreferenceMode);
        _thread.start();
    };

    /** Stops the thread. */
    this.stop = function() {
        if (!_thread) {
            throw new Error("Listener thread is not active.");
        }

        _continueCounter.countDown();
        _continueCounter = null;
    };

    /** Joins the thread. */
    this.join = function() {
        if (!_thread) {
            throw new Error("Listener thread is not active.");
        }
        if (_continueCounter) {
            throw new Error("Listener thread has not been stopped");
        }
        _thread.join();
        this.returnData = _thread.returnData();
        _thread = null;
    };
};


/**
 * Starts change stream listeners.
 *
 * @param {Number} nbListeners - The number of change stream listeners to start.
 * @param {String} dbName - The database name.
 * @param {String} collName - The collection name.
 * @param {Array} filterPipeline - The aggregation pipeline stages to append to the '$changeStream'
 *        stage.
 * @param {Object} changeStreamOptions - The options to pass to the '$changeStream' stage.
 * @param {Function} changeHandler - A callback function that takes a change document and a state
 *        object as parameters. The function will be called for each change document read.
 * @param {Object} changeHandlerOptions - An object that will be passed as argument to the
 *        changeHandler callback function. Defaults to an empty object.
 * @param {Array} mongoHosts - An array of mongos hosts to connect to. The listeners are assigned to
 *        each host sequentially (round robin). Optional.
 * @param {String} readPreferenceMode - The read preference of the change stream aggregation pipeline.
 * @return {Array} The created listeners.
 */
function startListeners(nbListeners, dbName, collName, filterPipeline,
                        changeStreamOptions, changeHandler, changeHandlerOptions, mongosHosts, targetHost, readPreferenceMode)
{
    if (mongosHosts && mongosHosts.length !== 0) {
        print("Starting " + nbListeners + " threads split on targets: " + tojson(mongosHosts));
    } else {
        print("Starting " + nbListeners + " threads");
    }
    var listeners = [];
    for (var i = 0; i < nbListeners; i++) {
        if (mongosHosts) {
            targetHost = mongosHosts[i % mongosHosts.length];
        }
        listener = new ChangeStreamListenerThread(dbName, collName, filterPipeline,
                                                  changeStreamOptions, changeHandler,
                                                  changeHandlerOptions, targetHost, readPreferenceMode);
        listener.start();
        listeners.push(listener);
    }
    return listeners;
}


/**
 * Waits for the listeners to be ready.
 *
 * @param {Array} listeners - The listeners as an array of ChangeStreamListenerThreads.
 */
function waitForListeners(listeners) {
    var start = new Date().getTime();
    // Timeout is 5s + 100ms * nb of listeners.
    // It is an arbitrary value aimed at giving enough time for all the listeners to start so
    // that when we hit a timeout it is clear something went wrong.
    var end_timeout = start + 5000 + listeners.length * 100;
    do {
        var allReady = true;
        for (var i = 0; i < listeners.length; i++) {
            if (!listeners[i].isReady()) {
                allReady = false;
                sleep(50);
                break;
            }
        }
        if (allReady) {
            return;
        }
    } while (new Date().getTime() < end_timeout);
    throw new Error("Timeout while waiting to for change stream listeners to be ready.");
}


/**
 * Starts and waits for the change stream listeners to be ready.
 *
 * @param {String} dbName - The database name.
 * @param {Array} collNames - An array of collection names on which to listen for change streams.
 * @param {Number} nListeners - The number of change streams listeners to start. Should be a
 *        multiple of the number of collections.
 * @param {Object} changeStreamOptions - Options for the '$changeStream' stage.
 * @param {Boolean} selectiveChange - Whether additional aggregation stages should be added
 *        after the '$changeStream' stage.
 * @return {Object} A dictionary mapping collection names to arrays of corresponding listeners.
 */
function startAndWaitForListeners(dbName, collNames, nListeners, changeStreamOptions, selectiveChange,
                                  changeHandler, changeHandlerOptions, mongosHosts, targetHost, readPreferenceMode) {
    print("Starting the change stream listeners");
    var listeners = {};
    var collName = null;
    var filterPipeline = getFilterPipeline(selectiveChange);
    var nCollListeners = nListeners / collNames.length;

    for (var i = 0; i < collNames.length; i++) {
        collName = collNames[i];
        listeners[collName] = startListeners(nCollListeners, dbName, collName, filterPipeline,
                                             changeStreamOptions, changeHandler, changeHandlerOptions,
                                             mongosHosts, targetHost, readPreferenceMode);
    }
    print("The listeners are started");

    print("Waiting for the listeners to be ready");
    for (collName in listeners) {
        waitForListeners(listeners[collName]);
    }
    print("All the listeners are ready");
    return listeners;
}

/**
 * Stops the change stream listeners.
 *
 * @param {Array} listeners - An array of ChangeStreamListenerThread objects.
 * @return {Array} An array containing the final state values for all the listeners.
 */
function stopListeners(listeners) {
    var states = [];
    var listener;
    for (var i = 0; i < listeners.length; i++) {
        listeners[i].stop();
    }
    for (var j = 0; j < listeners.length; j++) {
        listener = listeners[j];
        listener.join();
        if (listener.returnData.error) {
            throw new Error("A change stream listener failed with error: '" +
                            listener.returnData.message + "'");
        }
        states.push(listener.returnData);
    }
    return states;
}


/**********************************************************************
 * Helper functions to handle the change streams tests configuration. *
 **********************************************************************/

/**
 * Transforms a mongos host list as found infrastructure.out.yml into
 * a list of host strings that can be used to create a new Mongo connection.
 */
function getMongosHosts(mongosHostsConfig) {
    var mongos_targets = [];
    for (var i = 0; i < mongosHostsConfig.length; i++) {
        mongos_targets.push(mongosHostsConfig[i].private_ip);
    }
    return mongos_targets;
}


/**
 * Generates and returns a list of collection names.
 *
 * @param {String} baseName - The collections base name.
 * @param {Number} nCollections - The number of collection names to generate.
 * @return {Array} An array of collection names.
 */
function generateCollectionNames(baseName, nCollections) {
    var names = [];
    for (var i = 0; i < nCollections; i++) {
        names.push(baseName + "_" + i);
    }
    return names;
}


/**
 * Gets the filter pipeline that should follow the '$changeStream' aggregation stage.
 *
 * @param {bool} selectiveChange - The value of the 'selective_change' option for the test.
 * @return {Array} The filter pipeline to add to the change stream pipeline.
 */
function getFilterPipeline(selectiveChange) {
    if (selectiveChange) {
        return [{"$match": {"a": {"$lt": 100000}}}];
    } else {
        return [];
    }
}

/**
 * Indicates if the servers use a resume token with the BinData format.
 * This format is used before 3.7.4 and also when featureCompatibilityVersion is 3.6
 * (change streams are only supported since 3.6).
 */
function useBinDataResumeToken() {
    var version = db.version().split(".");
    if (version < [3, 7, 4]) {
        return true;
    }
    if (!sharded_cluster()) {
        // Mongos instances do not give access to the featureCompatibilityVersion.
        // On a sharded cluster we currently assume the FCV is not set to 3.6.
        // PERF-1447 was filed to handle FCV on sharded clusters in the future.
        fcvRes = db.adminCommand({ getParameter: 1, featureCompatibilityVersion: 1});
        fcv = fcvRes.featureCompatibilityVersion.version;
        if (fcv == "3.6") {
            return true;
        }
    }
    return false;
}


/**********************************************************************
 * Other common functions shared by the change stream test workloads. *
 **********************************************************************/

/**
 * Gets the base name for a test given its configuration options.
 *
 * @param {Number} nListeners - The number of change stream listeners.
 * @param {String} fullDocumentBeforeChange - The "fullDocumentBeforeChange" change stream option.
 * @param {String} fullDocument - The "fullDocument" change stream option.
 * @param {Boolean} selectiveChange - Whether the 'selective_change' option is set.
 * @param {Number} nCollections - The number of collections used by the test.
 * @param {Number} docSize - The size of the documents inserted into the collection.
 * @param {Boolean} preImageEnabled - Whether recording of the pre-images is enabled.
 * @param {String} readPreferenceMode - The read preference of the change stream aggregation pipeline.
 * @return {String} The test base name.
 */
function getTestBaseName(nListeners, fullDocumentBeforeChange, fullDocument, selectiveChange, nCollections, docSize, preImageEnabled, readPreferenceMode) {
    var baseName = `${nListeners}`;
    if (fullDocumentBeforeChange) {
        baseName += `_${fullDocumentBeforeChange}fdbc`;
    }
    if (fullDocument) {
        if (fullDocument == "updateLookup") {
            // Previously, change stream tests that had fullDocument being set to "updateLookup",
            // were having a prefix of "_lookup". The newly added prefix is omitted for this
            // fullDocument parameter in order to preserve the historical data.
            baseName += "_lookup";
        } else {
            baseName += `_${fullDocument}fd`;
        }
    }
    if (selectiveChange) {
        baseName += "_filter";
    }
    if (preImageEnabled) {
        baseName += "_preImage";
    }
    baseName += `_${nCollections}c`;

    // Previously, change stream tests were only executed with document size of 100 bytes. The newly
    // added prefix is omitted for this document size in order to preserve the historical data.
    if (docSize != 100) {
        baseName += `_${docSize}d`;
    }

    if (readPreferenceMode != "primary") {
        baseName += `_${readPreferenceMode}`;
    }

    return baseName;
}

/**
 * Returns the change stream options object for the specified configuration.
 *
 * @param {String} fullDocumentBeforeChange - The 'fullDocumentBeforeChange' parameter of the $changeStream stage.
 * @param {String} fullDocument - The 'fullDocument' parameter of the $changeStream stage.
 */
function getChangeStreamOptions(fullDocumentBeforeChange, fullDocument) {
    let result = {};
    if (fullDocumentBeforeChange != undefined) {
        result["fullDocumentBeforeChange"] = fullDocumentBeforeChange;
    }
    if (fullDocument != undefined) {
        result["fullDocument"] = fullDocument;
    }
    return result;
}

/**
  * Returns true if the FCV is 6.1 or above (when the feature flag is removed) or if feature flag
  * 'featureFlagClusterWideConfig' is enabled with FCV 6.0, false otherwise.
  */
function isClusterWideConfigFeatureAvailable(db) {
    const getParam = db.adminCommand({getParameter: 1, featureFlagClusterWideConfig: 1});
    // On replica sets, the FCV needs to be checked so that the last LTS/continuous FCV replset variants
    // are accounted for. On sharded clusters, the FCV cannot be retrieved directly from the mongos
    // so we use the binary version instead.
    if (!sharded_cluster()) {
        const fcv = db.adminCommand({getParameter: 1, featureCompatibilityVersion: 1});
        const majorFCV = fcv.featureCompatibilityVersion.version.split('.')[0];
        const minorFCV = fcv.featureCompatibilityVersion.version.split('.')[1];
        return (majorFCV > 6 || (majorFCV == 6 && minorFCV >= 1)) || (getParam.hasOwnProperty("featureFlagClusterWideConfig") &&
                                    (getParam.featureFlagClusterWideConfig.value) && (majorFCV == 6) && (minorFCV == 0));
    } else {
        const majorVersion = db.version().split('.')[0];
        const minorVersion = db.version().split('.')[1];
        return (majorVersion > 6 || (majorVersion == 6 && minorVersion >= 1)) || (getParam.hasOwnProperty("featureFlagClusterWideConfig") &&
                                    getParam.featureFlagClusterWideConfig.value);
    }
}

function testChangeStreamsCRUDWorkload(db,
                                       testName,
                                       { dbName, collNames, docSize, nCollections, nThreads, nListeners, nDocs, retryableWrites, mongosHosts, shardCollections, preImagesEnabled, changeStreamOptions, fullDocumentBeforeChange, fullDocument, selectiveChange, changeHandlerOptions, testDurationSecs, changeStreamReadPreference },
                                       targetHost,
                                       changeHandler,
                                       onStopCallback,
                                       writeOnlyWorkload) {
    // Set the number of documents to be inserted before running the workload.
    var nDocuments = nDocs != undefined ? nDocs : 10000000 / docSize;
    var workload = new MixedCRUDWorkload(dbName, collNames, nThreads, retryableWrites,
                                         mongosHosts, docSize, nDocuments, shardCollections, writeOnlyWorkload);

    // Note: the workload initialization calls quiesce at the end.
    workload.initialize();

    if (changeStreamOptions && !isClusterWideConfigFeatureAvailable(db)) {
      jsTestLog("This test requires the time-based change stream pre-/post-image retention policy to be available. Skipping test.");
      return;
    }
    if (preImagesEnabled) {
      for (var i = 0; i < collNames.length; i++) {
        assert.commandWorked(db.getSiblingDB(dbName).runCommand({collMod: collNames[i], changeStreamPreAndPostImages: {enabled: true}}));
      }
    }
    if (isClusterWideConfigFeatureAvailable(db)) {
      var changeStreamOptions = changeStreamOptions ? changeStreamOptions : {preAndPostImages: {expireAfterSeconds: "off"}};
      assert.commandWorked(db.getSiblingDB("admin").runCommand(Object.merge({setClusterParameter: {changeStreamOptions: changeStreamOptions}})));
    }

    jsTest.log("Change streams CRUD " + testName + ": nThreads=" + nThreads + " nListeners=" + nListeners + " preImagesEnabled=" + preImagesEnabled + " docSize=" + docSize + " nCollections=" + nCollections + " shardCollections=" + shardCollections + " " + new Date());

    var changeStreamOptions = getChangeStreamOptions(fullDocumentBeforeChange, fullDocument);
    var listeners = startAndWaitForListeners(dbName, collNames, nListeners, changeStreamOptions,
                                             selectiveChange, changeHandler,
                                             changeHandlerOptions, mongosHosts, targetHost, changeStreamReadPreference);

    print("Starting the workload");
    workload.start();
    sleep(testDurationSecs*1000);
    print("Finishing the workload");
    var throughput = workload.stop();

    onStopCallback(throughput, listeners, getTestBaseName(nListeners, fullDocumentBeforeChange, fullDocument, selectiveChange, nCollections, docSize, preImagesEnabled, changeStreamReadPreference));
}