/**
* @file
*
* Test performance of secondary reads while applying oplogs
*
* ### *Setup*
*
* This workload should be run in a replica set of at least two nodes. The
* IP addresses of the primary and one of the secondaries are specified by the
* configuration.
*
* ### *Test*
*
* On an empty replica set, the workload performs the following actions:
* - Setup stage:
* - Inserted *initial_nb_docs* documents into both the primary and the
* secondaries. This should be done before any writes and reads.
* - Actual test stage:
* - *nWriters* background writers doing infinite writes on the primary which
* will replicate these writes to secondaries.
* - *nReaders* readers doing queries on a sepcified seconary. Each query has
* *batch_size* numbers of documents and only reads documents inserted in the
* setup stage in order to have more constant behavior all the time.
*
* Read and write throughputs are reported as docs / sec. Read latencies on
* secondaries are reported as millis / ops. The workload also reports how
* delayed (millis) the secondary is from the primary when the workload finishes
* all the secondary reads.
*
* ### *Notes*
*
* - Default test time is 3 minutes.
* - Default document size is 500 bytes.
* - Default number of documents in each vectored insert is 1.
* - Default number of initial documents is 10000.
* - For production setting, the number of writer threads should be fixed and a
* varying number of reader threads is used.
*
* ### *Owning-team*
* mongodb/replication
*
* @module workloads/secondary_reads.js
*/
/**
* The actual values in use for the following parameters are injected by
* run_workloads.py, which
* gets it from config file.
* see {@link
* https://github.com/10gen/dsi/blob/138bbc5a39ca779e5b49d8d9242515329ba9d978/configurations/test_control/test_control.core.yml#L29-L31|this
* hello world example}.
*
* Parameters: reader_threads, writer_threads, primary, secondary,
* test_duration_secs, batch_size,
* document_size, prefix, initial_nb_docs
*/
/**
* The number of threads to read from a specified secondary.
*/
var reader_threads = reader_threads || [1, 16, 32];
/**
* The number of threads to write on primary.
*/
var writer_threads = writer_threads || [16];
/**
* The primary IP address. The default is "10.2.0.100".
*
* The value can be changed as a parameter, see
* {@link
* https://github.com/10gen/dsi/blob/138bbc5a39ca779e5b49d8d9242515329ba9d978/configurations/test_control/test_control.core.yml#L29-L31|this
* hello world example}.
*/
var primary = primary || "10.2.0.100";
/**
* The secondary IP address. The default is "10.2.0.101".
*
* The value can be changed as a parameter, see
* {@link
* https://github.com/10gen/dsi/blob/138bbc5a39ca779e5b49d8d9242515329ba9d978/configurations/test_control/test_control.core.yml#L29-L31|this
* hello world example}.
*/
var secondary = secondary || "10.2.0.101";
/**
* The prefix to the name of the test.
*/
var prefix = prefix || "";
/**
* The test duration in seconds per thread level.
*/
var test_duration_secs = test_duration_secs || 180;
/**
* The number of documents for each vectored insert.
*/
var batch_size = batch_size || 1;
/**
* The size of each document in bytes.
*/
var document_size = document_size || 500;
/**
* The number of preexisting documents. Note readers will only query these
* documents from the secondary.
*/
var initial_nb_docs = initial_nb_docs || 10000;
(function() {
var dbName = "secondary_reads";
var collName = "coll";
var coll = db.getCollection(collName);
var readBatchSize = 200;
var readLimit = 100;
var nWritersPrefix;
// Create a vectored insert batch for benchStart
var documentsToInsert = [];
var nCharacters =
document_size - Object.bsonsize({_id: ObjectId(), x: 0, str: "x"}) + 1;
for (var i = 0; i < batch_size; i++) {
documentsToInsert.push({
x: {"#RAND_INT": [initial_nb_docs, 1000000000]},
str: "x".repeat(nCharacters)
});
}
/**
* Insert *initial_nb_docs* documents into the database and make sure
* secondaries have the documents.
*/
function setup() {
coll.drop();
coll.createIndex({x: 1});
print("Inserting " + initial_nb_docs + " documents into the database");
var bulk = coll.initializeOrderedBulkOp();
for (var i = 0; i < initial_nb_docs; i++) {
bulk.insert({x: i, str: "x".repeat(nCharacters)});
}
var res = bulk.execute();
assert.eq(res.nInserted, initial_nb_docs);
}
/**
* Performs secondary reads on the collection and reports throughput.
*/
function runSecondaryReads(nThreads) {
var res = benchRun({
ops: [{
op: "find",
readCmd: true,
query: {
x: {$gte: {"#RAND_INT": [0, initial_nb_docs - readLimit]}}
},
ns: coll.getFullName(),
batchSize: readBatchSize,
limit: readLimit,
readPrefMode: "secondaryPreferred",
}],
seconds: test_duration_secs,
host: secondary,
parallel: nThreads,
username: username,
password: password
});
reportThroughput(
nWritersPrefix + prefix + "read_throughput",
res["totalOps/s"] * readLimit, {nThread: nThreads});
reportThroughput(
nWritersPrefix + prefix + "reads_latency_ms",
-res["queryLatencyAverageMicros"] / 1000.0, {nThread: nThreads});
}
/**
* Starts infinite writes on the primary.
*/
function startLoad(nThreads) {
return benchStart({
ops: [{
op: "insert",
ns: coll.getFullName(),
doc: documentsToInsert,
writeCmd: true
}],
host: primary,
parallel: nThreads,
username: username,
password: password
});
}
writer_threads.forEach(function(nWriters) {
reader_threads.forEach(function(nReaders) {
nWritersPrefix = nWriters + "writers:";
setup();
// Make sure all the initial documents are on secondaries.
quiesceSystem();
print("Starting " + nWriters + " background writers...");
var writeLoad = startLoad(nWriters);
// Make sure oplog appliers on secondaries are running.
waitOplogCheck();
print("Starting " + nReaders + " readers...");
runSecondaryReads(nReaders);
print("Stopping background writers...");
var res = benchFinish(writeLoad);
reportThroughput(
nWritersPrefix + prefix + "write_throughput",
res["totalOps/s"] * batch_size, {nThread: nReaders});
// Also report how delayed the secondary is from the primary at this
// point when workload finishes.
var testFinishTs = new Date();
waitOplogCheck();
var replicationFinishTs = new Date();
reportThroughput(
nWritersPrefix + prefix + "secondary_lag_ms",
-(replicationFinishTs - testFinishTs), {nThread: nReaders});
});
});
})();