Source: workloads/initialsync.js

/**
 *
 * @file
 * Test and measure
 * {@link https://docs.mongodb.com/manual/core/replica-set-sync/#initial-sync|initial sync}
 * performance.
 *
 * ### *Test*
 *
 * This workload performs the following actions:
 *   - creates *num_dbs* databases. (We use 1 or 32.).
 *   - creates *num_collections* / database. (We use 1 or 32.).
 *   - inserts a fixed number of documents. (5,000,000 docs in total).
 *   - creates a single compound index for each of the collections.
 *   - apply a write work load while syncing if *write_load* is true.
 *
 * On completion of the steps, above, a third member is added to the replica set.
 *
 * The initialsync metric is the number of (docs synced) / second for the
 * duration of the intial sync. The sync is defined to have completed
 * when the third node reports that it has transitioned to 'SECONDARY' state.
 *
 * The other metrics (e.g cloneDBs) represent the phases of the initial sync.
 * These actual phases vary, depending on the MongoDB version, and are generally
 * only useful for debugging purposes. They are reported as negative values
 * (latency) and higher is considered better.
 *
 * Results are reported as docs synced per second.
 *
 * ### *Setup*
 *
 * The starting point for this test is a 2 node replica set:
 *   - one primary
 *   - one secondary.
 *
 * Once these 2 nodes are populated, an additional (empty) third data bearing node is
 * added to the replica set. This additional node is added with
 * {@link https://docs.mongodb.com/manual/reference/method/rs.add/#rs.add|rs.add()},
 * as it was configured with the same replSetName.
 *
 * See the
 * {@link https://docs.mongodb.com/manual/tutorial/expand-replica-set/#add-a-member-to-an-existing-replica-set|Add Members to a Replica Set}
 * tutorial for more details.
 *
 * ### *Notes*
 *
 *   * Insert 5M (*num_docs*) documents across *num_dbs* databases and *num_collections*
 *     collections.
 *   * Each document contains the following fields:
 *     - *_id*: an ObjectId
 *     - *name*: a string "Wile E. Coyote"
 *     - *age*: an int between 0 and 120
 *     - *i*: an int, from 0 to num_docs / ( num_dbs * num_collections)
 *     - *address.street*: the string "443 W 43rd St",
 *     - *address.zip_code*: a rand Int between 0 and 100000
 *     - *address.city*: the string "New York"
 *     - *random*: a rand Int between 0 and 10000000
 *     - *phone_no*: a string comprised of the concatenation of a rand Int between
 *       0 and 1000 with  "-" and  a rand Int between 0 and 10000
 *     - *long_string*: a string created by concatenating a rand Int between 0 and
 *        100000000 and the number of 'a' characters equal to the min value of string_field_size and
 *        1000.
 *     - *other_long_string*: an unindexed string created by concatenating a rand Int
 *        between 0 and the number of 'a' characters used as overflow for when
 *        string_field_size > 1024.
 *     - *str*: a string of 1K 'a's
 *     - *numericField*: a random integer numeric field in the range 0 to 100M -1.
 *
 * ### *Owning-team*
 * mongodb/replication
 *
 * @module workloads/initialsync
 */
/* global db sharded_cluster Random */
/* global rs print tojson assert  */
/* global reportThroughput sleep jsTest */
/* global ScopedThread load Mongo getPhaseData transposePhaseLogs */
/* global waitForStates waitOplogCheck CountDownLatch reportDurations ReplSetTest isReplSet */
load('utils/log_analysis.js');
load("libs/initialsync_sync_types_impl.js");

// Set defaults for externally declared parameters.
/**
 * The number of databases to create. The default is 1.
 *
 * The actual values in use are injected by run_workloads.py, which gets it from config file,
 * see {@link https://github.com/10gen/dsi/blob/138bbc5a39ca779e5b49d8d9242515329ba9d978/configurations/test_control/test_control.core.yml#L29-L31|this hello world example}.
 */
var num_dbs = num_dbs || 1;

/**
 * The number of collections to create. The default is 1.
 *
 * The actual values in use are injected by run_workloads.py, which gets it from config file,
 * see {@link https://github.com/10gen/dsi/blob/138bbc5a39ca779e5b49d8d9242515329ba9d978/configurations/test_control/test_control.core.yml#L29-L31|this hello world example}.
 */
var num_collections = num_collections || 1;

/**
 * This value controls whether a write load is applied while running the workload.
 * If set to true then 100,000 docs are inserted while the initial sync is in progress.
 * The default is false.
 *
 * The actual values in use are injected by run_workloads.py, which gets it from config file,
 * see {@link https://github.com/10gen/dsi/blob/138bbc5a39ca779e5b49d8d9242515329ba9d978/configurations/test_control/test_control.core.yml#L29-L31|this hello world example}.
 */
var write_load = write_load || false;

/**
 * The IP address of the primary node. The default is *10.2.0.190*.
 *
 * The actual values in use are injected by run_workloads.py, which gets it from config file,
 * see {@link https://github.com/10gen/dsi/blob/138bbc5a39ca779e5b49d8d9242515329ba9d978/configurations/test_control/test_control.core.yml#L29-L31|this hello world example}.
 */
var primary_addr = primary_addr || "10.2.0.190";

/**
 * The port on which mongod process runs. The default is *27017*.
 *
 * The actual values in use are injected by run_workloads.py, which gets it from config file,
 * see {@link https://github.com/10gen/dsi/blob/138bbc5a39ca779e5b49d8d9242515329ba9d978/configurations/test_control/test_control.core.yml#L29-L31|this hello world example}.
 */
var port = port || "27017";

/**
 * The IP address of an empty data bearing node that needs to sync data from the primary. The default is *10.2.0.200*.
 *
 * The actual values in use are injected by run_workloads.py, which gets it from config file,
 * see {@link https://github.com/10gen/dsi/blob/138bbc5a39ca779e5b49d8d9242515329ba9d978/configurations/test_control/test_control.core.yml#L29-L31|this hello world example}.
 */
var empty_node_addr = empty_node_addr || "10.2.0.200";

/**
 * This value represents how an empty node should sync data from the primary. The default is *initialSync*.
 * Valid sync_type values are 'initialSync', 'rsync'.
 *
 * The actual values in use are injected by run_workloads.py, which gets it from config file,
 * see {@link https://github.com/10gen/dsi/blob/138bbc5a39ca779e5b49d8d9242515329ba9d978/configurations/test_control/test_control.core.yml#L29-L31|this hello world example}.
 */
var sync_type = sync_type || "initialSync";

/**
 * This tells how the mongod process has to be restarted in a workload. The default is *mongod  --config /tmp/mongo_port_27017.conf*.
 *
 * The actual values in use are injected by run_workloads.py, which gets it from config file,
 * see {@link https://github.com/10gen/dsi/blob/138bbc5a39ca779e5b49d8d9242515329ba9d978/configurations/test_control/test_control.core.yml#L29-L31|this hello world example}.
 */
var start_mongod = start_mongod || "mongod  --config /tmp/mongo_port_27017.conf";

/**
 * This value represents the directory where the mongod instance stores its data. The default is *data/dbs*.
 *
 * The actual values in use are injected by run_workloads.py, which gets it from config file,
 * see {@link https://github.com/10gen/dsi/blob/138bbc5a39ca779e5b49d8d9242515329ba9d978/configurations/test_control/test_control.core.yml#L29-L31|this hello world example}.
 */
var db_path = db_path || "data/dbs";

/**
 * This value tells whether or not to build indexes other than the _id index. The default is *true*.
 *
 * The actual values in use are injected by run_workloads.py, which gets it from config file,
 * see {@link https://github.com/10gen/dsi/blob/138bbc5a39ca779e5b49d8d9242515329ba9d978/configurations/test_control/test_control.core.yml#L29-L31|this hello world example}.
 */
var build_user_indexes = typeof build_user_indexes === 'undefined' ? true : build_user_indexes;

/**
 * This value represents the number of documents inserted. The default is *5 million*.
 *
 * The actual values in use are injected by run_workloads.py, which gets it from config file,
 * see {@link https://github.com/10gen/dsi/blob/138bbc5a39ca779e5b49d8d9242515329ba9d978/configurations/test_control/test_control.core.yml#L29-L31|this hello world example}.
 */
var num_docs = num_docs || 5 * 1024 * 1024;

/**
 * This value represents the size of long_string to be created in each doc. The default is *1KB*.
 * When string_field_size > 1000, overflow is added onto other_long_string instead of long_string.
 *
 * The actual values in use are injected by run_workloads.py, which gets it from config file,
 * see {@link https://github.com/10gen/dsi/blob/138bbc5a39ca779e5b49d8d9242515329ba9d978/configurations/test_control/test_control.core.yml#L29-L31|this hello world example}.
 */
var string_field_size = string_field_size || 1024;

var db_name = "initsync";
var num_inserts_for_write_load = 100000;
var num_threads = 16;
var test_name =
    "initialsync_dbs_" + num_dbs + "_colls_" + num_collections + "_writeload_" +
    write_load;

Random.srand(341215145);

var long_string = {str: "a"};
var other_long_string = {str: "a"};
// Enforce field size maximum so indexes can be created on long_string.
while (Object.bsonsize(long_string) < Math.min(string_field_size, 1000)) {
    long_string.str += "a";
}
// Add overflow to unindexed field other_long_string to grow the sum of the sizes of long_string
// and other_long_string to string_field_size.
while ((Object.bsonsize(long_string) + Object.bsonsize(other_long_string)) < string_field_size) {
    other_long_string.str += "a";
}

// This function returns a document with randomized fields, and one field, 'i',
// that identifies the document.
var create_doc = function(i, long_string, other_long_string) {
    var doc = {
        "name": "Wile E. Coyote",
        "age": Random.randInt(120),
        "i": i,
        "address": {
            "street": "443 W 43rd St",
            "zip_code": Random.randInt(100000),
            "city": "New York",
        },
        "random": Random.randInt(10000000),
        "phone_no": Random.randInt(1000) + "-" + Random.randInt(10000),
        "long_string": Random.randInt(100000000) + long_string.str,
        "other_long_string": Random.randInt(100000000) + other_long_string.str
    };

    return doc;
};

// This function inserts documents into the specified database and collection.
// It inserts 'num_docs' documents starting at 'min_doc_idx' for the document's
// identifier. 'create_doc' is a function that takes a 'long_string' and an
// index and returns a document.
function insert_docs(db_name, num_colls, num_docs, min_doc_idx, create_doc,
                     long_string, other_long_string, tid) {
    var currDB = db.getSiblingDB(db_name);
    var max_doc_idx = min_doc_idx + num_docs;
    Random.srand(341215145 + tid);

    // This function creates a list of documents from 'min_doc_idx' to 'max_doc_idx'.
    var create_doc_list = function(min_doc_idx, max_doc_idx) {
        var docs = [];
        for (var j = min_doc_idx; j < max_doc_idx; j++) {
            docs.push(create_doc(j, long_string, other_long_string));
        }
        return docs;
    };

    for (var j = 0; j < num_colls; j++) {
        var coll_name = "coll" + j;

        // Go one batch past the last even 1000 to ensure we get all documents.
        for (var i = min_doc_idx; i < max_doc_idx + 1000; i += 1000) {
            // Make sure to never go over the actual maximum doc index.
            var docs = create_doc_list(i, Math.min(max_doc_idx, i + 1000));
            currDB[coll_name].insert(docs, {ordered: false});
        }
    }
}

// check that the number of docs and indexes are correct
var checkContents = function(_db) {
    var num_indexes = (build_user_indexes && string_field_size <= 1024) ? 6 : 1;
    var docs_per_collection = num_docs / (num_dbs * num_collections);
    for (var i = 0; i < num_dbs; i++) {
        var currDB_name = db_name + i;
        var currDB = _db.getSiblingDB(currDB_name);
        for (var j = 0; j < num_collections; j++) {
            var coll_name = "coll" + j;

            // Assert that there are the right number of documents.
            assert.eq(currDB[coll_name].count(), docs_per_collection);

            // Assert all indexes were created.
            assert.eq(currDB[coll_name].getIndexes().length, num_indexes);
        }
    }
};

var load_data = function() {
    // Adjust num_docs to account for roundoff
    var docs_per_collection = num_docs / (num_dbs * num_collections);
    var docs_per_thread = Math.floor(docs_per_collection / num_threads);
    docs_per_thread = (docs_per_thread == 0) ? 1 : docs_per_thread;
    num_docs = docs_per_thread * num_threads * num_collections * num_dbs;
    print("Adding data with ",
          num_dbs,
          " dbs, ",
          num_collections,
          " collections, ",
          num_docs,
          " total docs, each with string fields of size ",
          string_field_size);
    print(docs_per_collection + " docs per collection");

    for (var i = 0; i < num_dbs; i++) {
        var currDB_name = db_name + i;
        var currDB = db.getSiblingDB(currDB_name);
        currDB.dropDatabase();
        print("Loading data to database: " + currDB_name);

        var threads = [];
        for (var t = 0; t < num_threads; t++) {
            // Start threads for each block of doc ids.
            var new_thread = new ScopedThread(insert_docs,
                                              currDB_name,
                                              num_collections,
                                              docs_per_thread,
                                              docs_per_thread * t,
                                              create_doc,
                                              long_string,
                                              other_long_string,
                                              t);
            new_thread.start();
            threads.push(new_thread);
        }

        for (var t = 0; t < num_threads; t++) {
            threads[t].join();
        }

        if (build_user_indexes && string_field_size <= 1024) {
            print("Creating indexes");
            for (var j = 0; j < num_collections; j++) {
                var coll_name = "coll" + j;

                assert.commandWorked(
                    currDB.runCommand({
                        createIndexes: coll_name,
                        indexes: [
                            {key: {phone_no: 1}, name: "phone_no"},
                            {key: {i: 1}, name: "i"},
                            {key: {random: 1}, name: "random"},
                            {key: {age: 1, long_string: 1}, name: "age_long_string"},
                            {key: {"address.zip_code": 1}, name: "address_zip_code"}
                        ]
                    })
                );
            }
            print("Done creating indexes.");
        }
    }
    print("Done adding documents");
    checkContents(db);
    print("Done checking contents");
};

var writeLoad = function(inserts, dbname, collname, create_doc, long_string, other_long_string) {
    print("Write load started");
    // writeLoad is in a thread, so the random number generator must be
    // seeded.
    Random.srand(341215145 + inserts);
    var coll = db.getSiblingDB(dbname).getCollection(collname);
    for (var i = 0; i < inserts; i++) {
        assert.writeOK(coll.insert(create_doc(i, long_string, other_long_string)));
    }
    print("Write load finished: " + inserts + " inserts.");
    return true;
};

var startPhaseLogCollection = function(phaseLogs) {
    var logCollectionThread = null;
    var initialSyncEndCounter = new CountDownLatch(1);
    logCollectionThread =
        new ScopedThread(collectLogData, empty_node_addr, port, phaseLogs,
                         initialSyncEndCounter);
    logCollectionThread.start();
    return [logCollectionThread, initialSyncEndCounter];
};

var stopPhaseLogCollection = function(logCollectionThread, initialSyncEndCounter) {
    if (logCollectionThread) {
        initialSyncEndCounter.countDown();
        logCollectionThread.join();
        var phaseData = logCollectionThread.returnData();

        // Data comes back read-only so we need to copy it.
        var phaseDataCopy = JSON.parse(JSON.stringify(phaseData));
        reportDurations(phaseDataCopy, test_name);
    }
};

var collectLogData = function(empty_node_addr, port, phaseLogs, stop_counter) {
    load('utils/log_analysis.js');
    load('utils/mongoshell.js');

    var startPhaseLogs = transposePhaseLogs(phaseLogs, "start");
    var endPhaseLogs = transposePhaseLogs(phaseLogs, "end");
    var needs_refresh = true;
    var phaseData = {};

    while (stop_counter.getCount() > 0) {
        sleep(10 * 1000); // Sleep for 10 secs.
        try {
            // Reconnect to empty node to survive node restart during rsync.
            if (needs_refresh) {
                var _db = getAdminDB(hostWithPort(empty_node_addr, port));
            }
            var logs = _db.adminCommand({getLog: "global"}).log;
            needs_refresh = false;
        } catch (e) {
            needs_refresh = true;
            continue;
        }
        getPhaseData(startPhaseLogs, endPhaseLogs, logs, phaseData);
    }
    return phaseData;
};

var run_initialsync = function() {
    var empty_node_admin_db = getAdminDB(hostWithPort(empty_node_addr, port));
    var uses_3dot2 =
        empty_node_admin_db.adminCommand({getParameter: 1, use3dot2InitialSync: 1}).use3dot2InitialSync;

    var phaseLogs = {};
    if (uses_3dot2) {
        phaseLogs = {
            start: {start: 'initial sync pending',
                    end: 'initial sync drop all databases'},
            dropDBs: {start: 'initial sync drop all databases',
                      end: 'initial sync clone all databases'},
            cloneDBs: {start: 'initial sync clone all databases',
                       end: 'oplog sync 1 of 3'},
            oplog1: {start: 'oplog sync 1 of 3', end: 'oplog sync 2 of 3'},
            oplog2: {start: 'oplog sync 2 of 3',
                     end: 'initial sync building indexes'},
            buildIndexes: {start: 'initial sync building indexes',
                           end: 'oplog sync 3 of 3'},
            oplog3: {start: 'oplog sync 3 of 3',
                     end: 'initial sync finishing up'},
            finishing: {start: 'initial sync finishing up',
                        end: 'initial sync done'},
            compact: {start: 'initial sync done',
                      end: 'transition to SECONDARY'},
        };
    } else {
        empty_node_admin_db.adminCommand({setParameter: 1,
                          logComponentVerbosity: {verbosity: 0,
                                                  replication: {verbosity: 2}}});
        phaseLogs = {
            start: {start: 'drop all user databases',
                    end: 'Dropping the existing oplog'},
            dropDBs: {start: 'Dropping the existing oplog',
                      end: 'Creating the oplog'},
            cloneDBs: {start: 'Creating the oplog',
                       end: 'Finished cloning data'},
            oplog1: {
                // The missing 'a' is deliberate.
                start: ['pplying operations until', 'No need to apply operations.'],
                end: 'Initial sync attempt finishing up.'},
            finishing: {start: 'Initial sync attempt finishing up.',
                        end: 'initial sync done'},
            compact: {start: 'initial sync done',
                      end: 'transition to SECONDARY'},
            cloneOneDB: {start: 'Scheduling listCollections call for database',
                         end: '    database:'},
            createIndex: {start: 'Creating indexes for',
                          end: 'Done creating indexes for'},
        };
    }

    jsTestLog(test_name + " - using " + sync_type);
    // Sanity check to make sure the current status is NotYetInitialized (94).
    assert.commandFailedWithCode(empty_node_admin_db.adminCommand("replSetGetStatus"),
                                 ErrorCodes.NotYetInitialized,
                                 "Node should be in the uninitialized state");

    // If there is no write load expected, turn off no-op oplog writes also.
    if (!write_load) {
        assert.commandWorked(db.adminCommand({setParameter: 1, writePeriodicNoops: false}));
    }

    load_data();
    var writeLoadDB = db.getSiblingDB('writeLoad');

    var coll = writeLoadDB.insert_empty;
    quiesceSystem();
    // we need to call quiesce for empty_node_admin_db as it has not yet been added to the
    // replica set
    quiesceSystem(empty_node_admin_db);

    // Start phase log collection.
    var res = startPhaseLogCollection(phaseLogs);
    var logCollectionThread = res[0];
    var initialSyncEndCounter = res[1];

    // Start the write workload.
    var insertThread = null;
    if (write_load) {
        insertThread = new ScopedThread(writeLoad,
                                        num_inserts_for_write_load,
                                        writeLoadDB.getName(),
                                        coll.getName(),
                                        create_doc,
                                        long_string,
                                        other_long_string);
        insertThread.start();
    }

    // Reconfigure the cluster.
    var elapsed_time = addNodeAndWaitForInitialSyncFinish(empty_node_admin_db);

    // Returning throughput in documents per second.
    reportThroughput(test_name, (num_docs * 1000) / elapsed_time, {nThread: 1});
    if(insertThread) {
        insertThread.join();
        assert(insertThread.returnData(), "insertThread failed.");
    }

    // Stop phase log collection.
    stopPhaseLogCollection(logCollectionThread, initialSyncEndCounter);

    writeLoadDB.dropDatabase();

    // test complete / has passed, wait to allow for primary to notice
    // the third node (empty node) state.
    waitForStates(db, 60 * 1000, "PRIMARY", "SECONDARY");
};

// This method:
// 1. confirms that the third node(empty node) reports that it is not initialized
// 2. adds it to the set (on the primary)
// 3. blocks until the third node reports that it is in a secondary state
// 4. validates the content of the third node after it transition to SECONDARY state.
//
// returns the total time taken to complete initial sync.
var addNodeAndWaitForInitialSyncFinish = function(empty_node_admin_db) {
    var start_time = Date.now();

	if (sync_type == "rsync") {
        var start_time_post_rsync = doRsync(primary_addr, empty_node_addr, port, db_path, start_mongod);
        // Reconnect to empty node to survive node restart during rsync.
        empty_node_admin_db = getAdminDB(hostWithPort(empty_node_addr, port));
    }

    print("Adding last node");
    assert.commandWorked(rs.add(hostWithPort(empty_node_addr, port)));

    var start_time_prime = Date.now();
    print(tojson(rs.status()));

    // Initial sync is over when the server goes into a secondary state.
    // Wait up to 1 hour for this to happen.
    assert.soonNoExcept(function () {
        assert.commandWorked(empty_node_admin_db.adminCommand({
            replSetTest: 1,
            waitForMemberState: ReplSetTest.State.SECONDARY,
            timeoutMillis: 60 * 60 * 1000
        }));
        return true;
    }, "Initial sync failed to complete", 60 * 60 * 1000);

    var end_time = Date.now();
    print("Empty data node now successfully in SECONDARY state");

    print("Checking secondary contents");
    empty_node_admin_db.getMongo().setSlaveOk();
    checkContents(empty_node_admin_db);

    var elapsed_time = end_time - start_time;

    var elapsed_time_prime;
    if (sync_type == "rsync")
        elapsed_time_prime = start_time_post_rsync - start_time;
    else
        elapsed_time_prime = end_time - start_time_prime;

    // "effectiveElapsedTimeSecs" includes only the data transfer time.
    // "totalElapsedTimeSecs" includes both the data transfer time and
    // time taken for pre/post data-transfer phase.
    jsTestLog("Test Result: " + tojson({
                testName: test_name,
                syncType: sync_type,
                totalElapsedTimeSecs: elapsed_time / 1000,
                effectiveElapsedTimeSecs: elapsed_time_prime / 1000
             }));
    return elapsed_time;
};

var run_tests = function() {
    if (sharded_cluster()) {
        print("Workload 'initialsync' " +
              "is not configured to run in a sharded environment.\n");
        return;
    } else if (isReplSet()) {
        db.adminCommand({fsync: 1});
        run_initialsync();
    } else {
        print("Workload 'initialsync' " +
              "is not configured to run in a standalone environment.\n");
        return;
    }

    for (var i = 0; i < num_dbs; i++) {
        var currDB = db.getSiblingDB(db_name + i);
        currDB.dropDatabase();
    }
};

run_tests();