Source: workloads/change_streams_listen_throughput.js

/**
 * @file
 * Test performance of a change stream pipeline.
 *
 * ### *Test*
 *
 * This workload initializes the database with 'numDocs' inserts (defined below) into a test
 * collection, drops the test collection, quiesces the database, and then executes change stream
 * pipelines that return the events recorded by the initial insert and drop commands.
 *
 * Change streams use a resume token from the beginning of the test in order to observe the insert
 * and drop commands from the initialization phase, which allows them to execute without any other
 * concurrent activity. The drop command invalidates change stream cursors as soon as they reach the
 * end of initialization activity, preventing them from spending time waiting for future events.
 *
 * There are three test pipelines, one with just a $changeStream, one with a $match stage that
 * includes 30% of events in the result set, and one with a $match stage that includes 60% of events
 * in the result set. The throughput of a change stream pipeline is computed as the number of events
 * it returns per second.
 *
 * ### *Setup*
 *
 * Supports replica sets and sharded clusters but not standalone instances.
 *
 * ### *Owning-team*
 * mongodb/replication
 * 
 * @module workloads/change_streams_listen_throughput
 */

var dbName = "change_streams_listen_throughput";
var collName = "test";

var batchSize = 1000;
var numDocs = 100000;

// Number of times to execute each benchmarked change stream.
var numTrials = 10;

var thread_levels = thread_levels || [1, 8, 16];

function cleanup(dbToTest) {
    dbToTest.dropDatabase();
}

/**
 * Initialize the oplog by inserting 'numDocs' documents into a test collection and then dropping
 * the collection. Note that the data itself is not important; we only need the commands to be in
 * the oplog. That being said, we have found that using a trivially small document size leads to
 * noise in the perf numbers, so we pad the documents to be roughly 1 KiB. This size means that
 * trivial changes to the change stream output format do not significantly affect the throughput.
 *
 * Returns a resume token that can be used to start a change stream from just before the insert
 * commands generated by this function.
 */
function insertDocs(numDocs, coll) {
    var changeStream = coll.watch();
    var initialResumeToken = changeStream.getResumeToken();
    changeStream.close();

    Random.setRandomSeed(341215145);
    var paddingString = "x".repeat(1000);

    for (var i = 0; i < numDocs / batchSize; i++) {
        var bulk = coll.initializeUnorderedBulkOp();
        for (var j = 0; j < batchSize; j++) {
            bulk.insert({a: Random.randInt(100), padding: paddingString});
        }
        bulk.execute();
    }

    coll.drop();

    return initialResumeToken;
}

/**
 * The entry point for each benchmark thread: executes a test change stream 'numTrials' times and
 * returns the average throughput from all of the trials. The caller can specify additional stages
 * in the 'pipeline' parameter to append to the change stream pipeline.
 */
function runQueriesInThread(testName, numTrials, dbName, collName, resumeToken, pipeline) {
    function runOneTrial() {
        var dbForTest = db.getSiblingDB(dbName);
        var totalEvents = 0;

        var startTime = Date.now();

        var pipelineWithChangeStream =
            [{$changeStream: {resumeAfter: resumeToken}}].concat(pipeline);
        var response = dbForTest.runCommand(
            {aggregate: collName, pipeline: pipelineWithChangeStream, cursor: {}});
        assert.commandWorked(response);

        var totalBatches = 1;
        if (response.cursor.firstBatch) {
            totalEvents += response.cursor.firstBatch.length;
        }

        while (response.cursor.id > 0) {
            response = dbForTest.runCommand({getMore: response.cursor.id, collection: collName});
            assert.commandWorked(response);

            if (response.cursor.nextBatch) {
                totalEvents += response.cursor.nextBatch.length;
            }
            totalBatches++;
        }

        var endTime = Date.now();
        var elapsedTime = endTime - startTime;

        assert.gt(totalEvents, 0);

        jsTestLog('Test ' + testName + ' observed ' + totalEvents + ' events in ' + totalBatches +
                  ' batches with elapsed time: ' + elapsedTime);

        // Throughput is reported as events returned per second.
        var elapsedTimeSec = elapsedTime / 1000;
        return totalEvents / elapsedTimeSec;
    }

    var throughputSum = 0;
    for (var i = 0; i < numTrials; ++i) {
        throughputSum += runOneTrial(testName, dbName, collName, resumeToken, pipeline);
    }

    // Throughput for this thread is reported as the average throughput of each trial.
    return throughputSum / numTrials;
}

/**
 * Launches 'nThreads' benchmark threads, waits for all of them to finish, and reports the total
 * throughput from all the threads.
 */
function runQueriesAndReportThroughput(testName, resumeToken, pipeline, nThreads) {
    var threads = [];
    for (var i = 0; i < nThreads; ++i) {
        var newThread = new ScopedThread(runQueriesInThread,
                                         testName,
                                         numTrials,
                                         dbName,
                                         collName,
                                         resumeToken,
                                         pipeline,
                                         nThreads);
        newThread.start();
        threads.push(newThread);
    }

    // Total throughput for all threads is reported as the sum of each thread's throughput.
    var throughputTotal = 0;
    threads.forEach(function(thread) {
        thread.join();
        var throughput = thread.returnData();
        throughputTotal += throughput;
    });

    reportThroughput(testName, throughputTotal, {nThread: nThreads});
}

function run_test() {
    var dbToTest = db.getSiblingDB(dbName);
    var coll = dbToTest.getCollection(collName);

    if (sharded_cluster()) {
        enableSharding(dbToTest);
        if (shard_collections()) {
            shardCollection(dbToTest, coll);
        }
    }

    var resumeToken = insertDocs(numDocs, coll);
    quiesceSystem();
    assert(waitOplogCheck);

    thread_levels.forEach(function(nThreads) {
        runQueriesAndReportThroughput("change_streams_listener", resumeToken, [], nThreads);
        quiesceSystem();

        // This pipeline's $match includes 30% of events.
        runQueriesAndReportThroughput("change_streams_listener_filtered_30",
                                      resumeToken,
                                      [{$match: {'fullDocument.a': {$lt: 30}}}],
                                      nThreads);
        quiesceSystem();

        // This pipeline's $match includes 60% of events.
        runQueriesAndReportThroughput("change_streams_listener_filtered_60",
                                      resumeToken,
                                      [{$match: {'fullDocument.a': {$lt: 60}}}],
                                      nThreads);
        quiesceSystem();
    });

    cleanup(dbToTest);
}

run_test();