Source: workloads/secondary_performance.js

/**
 *
 * @file
 *
 * This workload is designed to capture replication throughput and provide
 * a comparison to the primary throughput. It is solely converned with the pace of
 * replication of a secondary process. As a result, it specifically uses a 2 node PS
 * (**1 Primary**, **1 Secondary**) replica set configuration.
 *
 * ### *Test*
 *
 * On an empty 2 node PS replica set, this workload performs the following actions:
 * - Pause replication.
 * - Insert a fixed number of documents (on the primary only as replication is paused).
 * - Note the duration required to insert all the documents (in seconds)
 *   as the primary duration.
 * - Perform a blocking write in a separate thread.
 * - Unpause replication.
 * - Wait for the blocking write thread to complete.
 * - Note the duration of the blocking write as the secondary duration (in seconds).
 *
 * Results are the primary and secondary throughputs. They are calculated from
 * *count* / *duration* docs per second.
 *
 * ### *Setup*
 *
 * The starting point for this test is a 2 node replica set:
 *   - one primary
 *   - one secondary.
 *
 * To begin the test:
 *   - a single document is inserted to ensure the collection exists on all processes.
 *   - all the replica set members are quiesced.
 *   - replication is stopped on the secondary.
 *
 * The {@link https://docs.mongodb.com/manual/reference/command/fsync/|fsync} command is used
 * to flush all writes on the primary.
 *
 * An insertion workload is written to the primary using each of the *thread_levels* as the num
 * insert threads.
 * Each thread inserts a total of  *count* / *num insert threads* documents in a
 * {@link https://docs.mongodb.com/manual/reference/method/Bulk/#unordered-operations|UnorderedBulkOp}
 * of up to a max of 1000 documents per batch.
 *
 * The documents consists of:
 *   - *_id*: an {@link https://docs.mongodb.com/manual/reference/method/ObjectId/|ObjectId}
 *   - *s*: a string of of length *size* + 1 (*size* varies from 1 to 10000)
 *
 * Next a write is performed with a
 * {@link https://docs.mongodb.com/manual/reference/write-concern/index.html#w-option|write concern}
 * of \{ w: all \} is performed in a separate thread.
 * Replication is resumed and the duration of the write thread is timed.
 *
 * ### *Notes*
 *
 * Currently, the externally supplied thread_levels are [32] for wiredTiger and [10] for MMAPv1.

 * For each thread level as num insert threads:
 *   * Insert *count* documents using *num insert threads* threads.
 *   * Each document contains the following fields:
 *     - *_id*: an ObjectId
 *     - *s*: a string of *size* + 1 bytes
 *   * Roughly speaking each document will be 30 + *size* + 1 bytes in length.
 *   * The value of 30 comes from the result of the following javascript
 *     ~Object.bsonsize({_id:ObjectId() , s:''})~
 *
 * ### *Owning-team*
 * mongodb/replication
 *
 * @module workloads/secondary_performance
 */
/* global db sharded_cluster Random */
/* global rs print tojson assert Thread humanReadableNumber */
/* global stopServerReplication reportThroughput sleep jsTest quiesceSystem */
/* global ScopedThread load Mongo getPhaseData transposePhaseLogs */
/* global waitOplogCheck CountDownLatch reportDurations ReplSetTest isReplSet */

/**
 * The primary IP address. The default is "10.2.0.100".
 *
 * The value can be changed as a parameter, see
 * {@link https://github.com/10gen/dsi/blob/138bbc5a39ca779e5b49d8d9242515329ba9d978/configurations/test_control/test_control.core.yml#L29-L31|this hello world example}.
 */
var primary = primary || "10.2.0.100";

/**
 * The secondary IP address. The default is "10.2.0.101".
 *
 * The value can be changed as a parameter, see
 * {@link https://github.com/10gen/dsi/blob/138bbc5a39ca779e5b49d8d9242515329ba9d978/configurations/test_control/test_control.core.yml#L29-L31|this hello world example}.
 */
var secondary = secondary || "10.2.0.101";

/**
 * The size of string to use. It can be any value (less than ~16GB).
 * The external configutation currently uses 1, 100, 1000 and 10000. The default is 1.
 *
 * The value can be changed as a parameter, see
 * {@link https://github.com/10gen/dsi/blob/138bbc5a39ca779e5b49d8d9242515329ba9d978/configurations/test_control/test_control.core.yml#L29-L31|this hello world example}.
 */
var size = size || 1; // Size of the string used in the docs inserted

/**
 * The total number of documents to insert. One of 10M and 1M (1M is for size
 * 10000). The default is 10M.
 *
 * The value can be changed as a parameter, see
 * {@link https://github.com/10gen/dsi/blob/138bbc5a39ca779e5b49d8d9242515329ba9d978/configurations/test_control/test_control.core.yml#L29-L31|this hello world example}.
 */
var count = count || 10*1000*1000; // Number of documents to insert

/**
 * The number of threads used to insert documents. The default is [32].
 *
 * *Note*: this value is only used as the number of threads to insert data.
 * Unlike in other tests, it is not specifically configuring a measurable metric for this
 * test. As a result, only a single value is expected.
 *
 * The actual values in use are injected by run_workloads.py, which gets it from config file,
 * see {@link https://github.com/10gen/dsi/blob/138bbc5a39ca779e5b49d8d9242515329ba9d978/configurations/test_control/test_control.core.yml#L29-L31|this hello world example}.
 *
 */
var thread_levels = thread_levels || [ 32 ];

function runBenchmark(primary, secondary, benchmark, username, password, authEnabled) {
    "use strict";
    var privateCollection = "private_db.private_collection";

    var num_members = primary.getDB("test")._adminCommand("replSetGetStatus").members.length;

    // Preinsert a document so the collection exists on the secondary.
    assert.writeOK(primary.getCollection(privateCollection).insert({}, {writeConcern:{w:num_members}}));

    quiesceSystem();

    // Stop application on the secondary.
    stopServerReplication(secondary);

    var primaryTime = Date.timeFunc(function(){ return benchmark(primary); }) / 1000;

    primary.adminCommand('fsync');

    // Spawn a thread to do a w:all write. It will block until the secondary finishes replicating.
    var thread = new ScopedThread(function(host, privateCollection, num_members, username, password, authEnabled) {
        if (authEnabled){
            var hostAuth = 'mongodb://'.concat(username, ':', password, '@', host);
        } else {
            hostAuth = host;
        }
        assert.writeOK(new Mongo(hostAuth).getCollection(privateCollection)
                                      .insert({}, {writeConcern:{w:num_members}}));
    }, primary.host, privateCollection, num_members, username, password, authEnabled);
    thread.start();
    // Restart application on the secondary.
    assert.commandWorked(secondary.adminCommand({configureFailPoint: 'stopReplProducer',
                                                 mode: 'off'}));
    var secondaryTime = Date.timeFunc(function(){ return thread.join(); }) / 1000;
    primary.getDB('bench').dropDatabase({w: 2});
    secondary.adminCommand('fsync');

    return {primaryTime: primaryTime, secondaryTime: secondaryTime};
}


function insertDocumentsThread(mongo, size, count, username, password, authEnabled) {
    if (typeof(mongo) == 'string') {
        if (authEnabled){
            var mongoAuth = 'mongodb://'.concat(username, ':', password, '@', mongo);
        } else {
            mongoAuth = mongo;
        }
        mongo = new Mongo(mongoAuth);
    }
    var collection = mongo.getCollection('bench.collection');
    var str = Array(size+1).toString(); // TODO uncompressible?
    while (count > 0) {
        // Send batches of up to 1000 to prevent unlimited client-side buffering.
        var batch = collection.initializeUnorderedBulkOp();
        var batchSize = Math.min(count, 1000);
        for (var i = 0; i< batchSize; i++){
            batch.insert({s:str}); // _id set automatically.
        }
        assert.writeOK(batch.execute());
        count -= batchSize;
    }
}

function insertDocuments(mongo, size, count, num_insert_threads, username, password) {
    if (typeof num_insert_threads === 'undefined') {
        num_insert_threads = 1;
    }
    var threads = [];
    for (i = 0; i < num_insert_threads; i++) {
        threads.push(new Thread(insertDocumentsThread, mongo.host, size, count/num_insert_threads, username, password, authEnabled));
        threads[threads.length-1].start();
    }
    threads.forEach(function(thread){ thread.join(); });
    return;

}

function getDBIfString(mongo, username, password, authEnabled) {
    if (typeof(mongo) == 'string') {
        print ("mongo is a string and is " + mongo);
        if (authEnabled){
            var mongoAuth = 'mongodb://'.concat(username, ':', password, '@', mongo);
        } else {
            mongoAuth = mongo;
        }
        mongo = new Mongo(mongoAuth);
    }
    return mongo;
}


function runInsertTestAndReport(size, count) {
}

(function() {
    "use strict";
    if (isReplSet()) {
        // This likely can be handled better without variable reassignment, feel free to update
        var usernameFunc = username;
        var passwordFunc = password;
        var authEnabledFunc = authEnabled;
        primary = getDBIfString(primary, usernameFunc, passwordFunc, authEnabledFunc);
        secondary = getDBIfString(secondary, usernameFunc, passwordFunc, authEnabledFunc);
        thread_levels.forEach(function(num_insert_threads){
            var suffix = "size_" + humanReadableNumber(size) + "_count_" + humanReadableNumber(count);
            print ("Running tests for " + suffix);
            var results = runBenchmark(primary, secondary,
                                       function(mongo){ return insertDocuments(mongo, size, count, num_insert_threads, usernameFunc, passwordFunc, authEnabledFunc); }, usernameFunc, passwordFunc, authEnabledFunc);
            print("Primary time: " + results["primaryTime"]);
            print("Secondary time: " + results["secondaryTime"]);
            reportThroughput("Primary_" + suffix, count /results["primaryTime"], {nThread: num_insert_threads});
            reportThroughput("Secondary_" + suffix, count /results["secondaryTime"], {nThread: num_insert_threads});
        });
    } else {
        print("Workload 'secondary_performance' is not configured to run in a standalone or sharded environment.\n");
        return;
    }
}());