/**
* @file
*
* Test performance of database load in the presence of change stream listeners.
* <br>
*
* ### *Setup*
* None.
*
* ### *Test*
* The test applies a mixed CRUD workload to a collection in the cluster while a number of threads
* read change streams for the collection.
* The measured throughputs are the throughputs of the CRUD workload.
* The reading threads simply loop over the change stream cursor, increasing a counter without
* saving or looking at the data.
*
* The test takes two main optional parameters: an array specifying the number of CRUD threads
* 'thread_levels' and an array specifying the number of change stream listeners 'listener_levels'.
* The test then runs all the combinations of number of threads and number of listeners specified.
* Other parameters exist to specify different change stream behaviors, the number of collections
* to target or sharding specific options.
*
* Results are reported as ops/sec.
*
* ### *Notes*
* - The test runs for 5 minutes.
*
* ### *Owning-team*
* mongodb/replication
*
* @module workloads/change_streams_crud_throughput
*/
/*global
print reportThroughput sleep
*/
load("libs/change_streams.js");
load("libs/mixed_workload.js");
/**
* The actual values in use for the following parameters are injected by run_workloads.py, which gets it from the config file.
* see {@link https://github.com/10gen/dsi/blob/138bbc5a39ca779e5b49d8d9242515329ba9d978/configurations/test_control/test_control.core.yml#L29-L31|this hello world example}.
*
* Parameters: thread_levels, listener_levels, selective_change, nb_collections, mongos_hosts, shard_collections,
* doc_size, nb_docs, pre_images_enabled, full_document_before_change, full_document, change_stream_options, test_duration_secs.
*/
/**
* The number of threads to run in parallel. The default is [4, 64].
* **Note:** Thread level should be divisible by 4 * nb_collections * mongos_hosts.length.
*/
var thread_levels = thread_levels || [4, 64];
/** The number of listeners to run in parallel. The default is [1, 10, 100, 1000]. */
var listener_levels = listener_levels || [1, 10, 100, 1000];
/**
* Whether the change stream pipeline should include an additional filtering stage.
* Defaults to false.
*/
var selective_change = selective_change || false;
/** The number of collections to run the workload and listeners against. Defaults to [1]. */
var nb_collections = nb_collections || [1];
/**
* The mongos hosts assigned IP addresses. The expected format is a list of dictionaries containing
* a "private_ip" field.
* This parameter is used to connect through multiple mongoses in a sharded cluster.
* If left unassigned, the workload and listeners will connect to the default mongos.
* Only applies if the cluster is sharded.
*/
var mongos_hosts = mongos_hosts || [];
/**
* If the collections should be sharded when running against a sharded cluster.
* Defaults to true.
* Only applies if the cluster is sharded.
*/
var shard_collections = shard_collections !== false;
/**
* The size in bytes of the documents to insert in each collection during the workload.
* Defaults to [100] document size.
*/
var doc_sizes = doc_sizes || [100];
/**
* The number of documents to insert in each collection before starting the workload.
* Defaults to undefined.
* If 'nb_docs' is undefined, the value is computed dynamically: nb_docs = 10_000_000 / doc_size.
*/
var nb_docs = nb_docs || undefined;
/**
* Represents the collection option of pre-image recording being enabled for all collections.
* Defaults to false.
*/
var pre_images_enabled = pre_images_enabled || false;
/**
* Possible modes for the 'fullDocumentBeforeChange' parameter of the $changeStream stage.
* Defaults to undefined.
*/
var full_document_before_change = full_document_before_change || undefined;
/**
* Possible modes for the 'fullDocument' parameter of the $changeStream stage.
* Defaults to undefined.
*/
var full_document = full_document || undefined;
/*
* A specification for the change streams options.
* Defaults to undefined.
*/
var change_stream_options = change_stream_options || undefined;
/**
* The test duration in seconds per thread level and listener level combination.
* Defaults to 300 (5 minutes).
* Note: stalls come in 60 sec cycles.
*/
var test_duration_secs = test_duration_secs || 5*60;
/**
* Determines the read preference for the change stream.
* Defaults to primary.
*/
var change_stream_read_preference = change_stream_read_preference || "primary";
(function() {
"use strict";
load("utils/exec_helpers.js"); // For 'ExecHelpers'.
var dbName = "change_streams";
var retryableWrites = false;
var mongosHosts;
if (sharded_cluster()) {
mongosHosts = getMongosHosts(mongos_hosts);
}
// Options for the changeHandler callback;
var changeHandlerOptions = null;
/**
* A callback function for the ChangeStreamListenerThread that counts the number of documents.
*/
function changeHandler(doc, state, options) {
state.value += 1;
}
/**
* A callback function that reports the throughput data on finishing running the given workload.
*/
function onStopHandler(throughput, listeners, baseName, nThreads) {
reportThroughput(baseName + "_findOne",
throughput.findOne, {nThread: nThreads});
reportThroughput(baseName + "_insert",
throughput.insert, {nThread: nThreads});
reportThroughput(baseName + "_update",
throughput.update, {nThread: nThreads});
reportThroughput(baseName + "_delete",
throughput.delete, {nThread: nThreads});
reportThroughput(baseName + "_total",
throughput.total, {nThread: nThreads});
print("Stopping the listeners");
var counts = null;
for (var collName in listeners) {
counts = stopListeners(listeners[collName]);
}
}
/**
* Tests the CRUD workload performance.
*
* @param {Array} collNames - An array of collection names on which to apply the workload.
* @param {Number} docSize - The size in bytes of the documents to insert in each collection during the workload.
* @param {Number} nCollections - The number of collections to run the workload and listeners against.
* @param {Number} nThreads - The number of CRUD workload threads to use.
* @param {Number} nListeners - The number of change stream listeners to start.
*/
function testChangeStreams(collNames, docSize, nCollections, nThreads, nListeners) {
var workloadParameters = {
dbName: dbName,
collNames: collNames,
docSize: docSize,
nCollections: nCollections,
nThreads: nThreads,
nListeners: nListeners,
nDocs: nb_docs,
retryableWrites: retryableWrites,
mongosHosts: mongosHosts,
shardCollections: shard_collections,
preImagesEnabled: pre_images_enabled,
changeStreamOptions: change_stream_options,
fullDocumentBeforeChange: full_document_before_change,
fullDocument: full_document,
selectiveChange: selective_change,
changeHandlerOptions: changeHandlerOptions,
testDurationSecs: test_duration_secs,
changeStreamReadPreference: change_stream_read_preference,
};
testChangeStreamsCRUDWorkload(db, "throughput", workloadParameters, undefined /* targetHost */, changeHandler, function (throughput, listeners, baseName) {
onStopHandler(throughput, listeners, baseName, nThreads);
});
}
ExecHelpers
.cartesianProduct(
doc_sizes,
nb_collections,
thread_levels,
listener_levels
)
.forEach(function(args) {
var nCollections = args[1];
var nThreads = args[2];
var collectionNames = generateCollectionNames("change_streams_" + nThreads, nCollections);
testChangeStreams(collectionNames, args[0], nCollections, nThreads, args[3]);
});
})();