/**
* @file
* Test performance of a change stream pipeline.
*
* ### *Test*
*
* This workload initializes the database with 'numDocs' inserts (defined below) into a test
* collection, drops the test collection, quiesces the database, and then executes change stream
* pipelines that return the events recorded by the initial insert and drop commands.
*
* Change streams use a resume token from the beginning of the test in order to observe the insert
* and drop commands from the initialization phase, which allows them to execute without any other
* concurrent activity. The drop command invalidates change stream cursors as soon as they reach the
* end of initialization activity, preventing them from spending time waiting for future events.
*
* There are three test pipelines, one with just a $changeStream, one with a $match stage that
* includes 30% of events in the result set, and one with a $match stage that includes 60% of events
* in the result set. The throughput of a change stream pipeline is computed as the number of events
* it returns per second.
*
* ### *Setup*
*
* Supports replica sets and sharded clusters but not standalone instances.
*
* ### *Owning-team*
* mongodb/replication
*
* @module workloads/change_streams_listen_throughput
*/
var dbName = "change_streams_listen_throughput";
var collName = "test";
var batchSize = 1000;
var numDocs = 100000;
// Number of times to execute each benchmarked change stream.
var numTrials = 10;
var thread_levels = thread_levels || [1, 8, 16];
function cleanup(dbToTest) {
dbToTest.dropDatabase();
}
/**
* Initialize the oplog by inserting 'numDocs' documents into a test collection and then dropping
* the collection. Note that the data itself is not important; we only need the commands to be in
* the oplog. That being said, we have found that using a trivially small document size leads to
* noise in the perf numbers, so we pad the documents to be roughly 1 KiB. This size means that
* trivial changes to the change stream output format do not significantly affect the throughput.
*
* Returns a resume token that can be used to start a change stream from just before the insert
* commands generated by this function.
*/
function insertDocs(numDocs, coll) {
var changeStream = coll.watch();
var initialResumeToken = changeStream.getResumeToken();
changeStream.close();
Random.setRandomSeed(341215145);
var paddingString = "x".repeat(1000);
for (var i = 0; i < numDocs / batchSize; i++) {
var bulk = coll.initializeUnorderedBulkOp();
for (var j = 0; j < batchSize; j++) {
bulk.insert({a: Random.randInt(100), padding: paddingString});
}
bulk.execute();
}
coll.drop();
return initialResumeToken;
}
/**
* The entry point for each benchmark thread: executes a test change stream 'numTrials' times and
* returns the average throughput from all of the trials. The caller can specify additional stages
* in the 'pipeline' parameter to append to the change stream pipeline.
*/
function runQueriesInThread(testName, numTrials, dbName, collName, resumeToken, pipeline) {
function runOneTrial() {
var dbForTest = db.getSiblingDB(dbName);
var totalEvents = 0;
var startTime = Date.now();
var pipelineWithChangeStream =
[{$changeStream: {resumeAfter: resumeToken}}].concat(pipeline);
var response = dbForTest.runCommand(
{aggregate: collName, pipeline: pipelineWithChangeStream, cursor: {}});
assert.commandWorked(response);
var totalBatches = 1;
if (response.cursor.firstBatch) {
totalEvents += response.cursor.firstBatch.length;
}
while (response.cursor.id > 0) {
response = dbForTest.runCommand({getMore: response.cursor.id, collection: collName});
assert.commandWorked(response);
if (response.cursor.nextBatch) {
totalEvents += response.cursor.nextBatch.length;
}
totalBatches++;
}
var endTime = Date.now();
var elapsedTime = endTime - startTime;
assert.gt(totalEvents, 0);
jsTestLog('Test ' + testName + ' observed ' + totalEvents + ' events in ' + totalBatches +
' batches with elapsed time: ' + elapsedTime);
// Throughput is reported as events returned per second.
var elapsedTimeSec = elapsedTime / 1000;
return totalEvents / elapsedTimeSec;
}
var throughputSum = 0;
for (var i = 0; i < numTrials; ++i) {
throughputSum += runOneTrial(testName, dbName, collName, resumeToken, pipeline);
}
// Throughput for this thread is reported as the average throughput of each trial.
return throughputSum / numTrials;
}
/**
* Launches 'nThreads' benchmark threads, waits for all of them to finish, and reports the total
* throughput from all the threads.
*/
function runQueriesAndReportThroughput(testName, resumeToken, pipeline, nThreads) {
var threads = [];
for (var i = 0; i < nThreads; ++i) {
var newThread = new ScopedThread(runQueriesInThread,
testName,
numTrials,
dbName,
collName,
resumeToken,
pipeline,
nThreads);
newThread.start();
threads.push(newThread);
}
// Total throughput for all threads is reported as the sum of each thread's throughput.
var throughputTotal = 0;
threads.forEach(function(thread) {
thread.join();
var throughput = thread.returnData();
throughputTotal += throughput;
});
reportThroughput(testName, throughputTotal, {nThread: nThreads});
}
function run_test() {
var dbToTest = db.getSiblingDB(dbName);
var coll = dbToTest.getCollection(collName);
if (sharded_cluster()) {
enableSharding(dbToTest);
if (shard_collections()) {
shardCollection(dbToTest, coll);
}
}
var resumeToken = insertDocs(numDocs, coll);
quiesceSystem();
assert(waitOplogCheck);
thread_levels.forEach(function(nThreads) {
runQueriesAndReportThroughput("change_streams_listener", resumeToken, [], nThreads);
quiesceSystem();
// This pipeline's $match includes 30% of events.
runQueriesAndReportThroughput("change_streams_listener_filtered_30",
resumeToken,
[{$match: {'fullDocument.a': {$lt: 30}}}],
nThreads);
quiesceSystem();
// This pipeline's $match includes 60% of events.
runQueriesAndReportThroughput("change_streams_listener_filtered_60",
resumeToken,
[{$match: {'fullDocument.a': {$lt: 60}}}],
nThreads);
quiesceSystem();
});
cleanup(dbToTest);
}
run_test();