Source: workloads/snapshot_window.js

/**
 * @file
 *
 * Compare performance of a writes + PIT reads workload using a dynamically versus statically
 * maintained available snapshot history window size, and different storage engine cache sizes.
 *
 * ### *Setup*
 *
 * Creates a collection and inserts 'nb_docs' over a period of time of
 * 'snpashot_window_size_in_seconds' seconds in order to establish an available snapshot history
 * window of max size. THe data documents are very large, so cache pressure can be built quickly via
 * the updates workload that will be run.
 *
 * ### *Test*
 *
 * Do the following workloads in parallel:
 *
 * * Continuous updates to the existing documents. This is done to build cache pressure as well as
 *   move the snapshot window forward in time so that it will dynamically adjust. The stable
 *   timestamp must move forward for the window between it and oldest timestamp to grow or shrink.
 *
 * * Point-in-time reads using atClusterTime between the latest timestamp and
 *   'maxTargetSnapshotHistoryWindowInSeconds'. These reads will remain open for a time, pinning the
 *   snapshot in cache until it finishes. This attempts to simulate snapshot transaction user
 *   requests.
 *
 * The test may run with a 'static_window' parameter. If this is true, then a failpoint will be
 * activated to prevent changes to 'targetSnapshotHistoryWindowInSeconds', which is the size of the
 * available snapshot history window that the system tries to maintain. Normally, the window size
 * adjusts dymanically in response to cache pressure and PIT reads failing with SnapshotTooOld
 * errors.
 *
 * The throughput measurements will be
 *     - successful PIT reads/sec
 *     - total successful PIT reads
 *     - total failed PIT reads
 *     - updates/sec
 *     - total updates
 *
 * ### *Owning-team*
 * mongodb/storage-execution
 *
 * @module workloads/snapshot_window
 */

/*global
 print reportThroughput sleep
*/

/**
 * The actual values in use for the following parameters are injected by run_workloads.py, which
 * gets it from a config file, dsi/configurations/test_control/test_control.snapshot_window.yml.
 *
 * Parameters:
 *    - reader_threads
 *    - writer_threads
 *    - static_window
 *    - nb_docs
 *    - server_cache_size_gb
 *    - snapshot_held_open_for_seconds
 *    - test_duration_secs
 */

/**
 * The number of parallel readers and writers, for the simultaneously running read and write
 * workloads.
 */
var writer_threads = writer_threads || 3;
var reader_threads = reader_threads || 20;

/**
 * Dictates whether the test is run with a statically or dynamically sized available snapshot
 * window.
 */
var static_window = static_window || false;

/**
 * The number of documents to insert into the collection during setup.
 */
var nb_docs = nb_docs || 20;

/**
 * Specifies the storage engine's cache size in GB.
 */
var server_cache_size_gb = server_cache_size_gb || 1;

/**
 * Each successful find operation will hold a snapshot open between 0 and
 * snapshot_held_open_for_seconds seconds by delaying the getMore. The specific value is randomized
 * since transcation will be held open for varying lengths of time.
 */
var snapshot_held_open_for_seconds = snapshot_held_open_for_seconds || 1;

/**
 * How long to run the workloads.
 */
var test_duration_secs = test_duration_secs || 60;

(function() {
    var dbName = "snapshot_window";
    var collName = "testcoll";
    var testDB = db.getSiblingDB(dbName);
    var coll = testDB.getCollection(collName);

    // We will use a batchSize of 2 and limit of 3 to provoke a single getMore in a find request.
    var batchSize = 2;
    var limit = 3;

    // We will generate large random strings to use in document inserts/updates.
    var listCharsString = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
    var largeStringLength = 10000;

    // TODO: this can become configurable with PERF-1571.
    var maxSnapshotWindowSizeInSeconds = getParameter("maxTargetSnapshotHistoryWindowInSeconds");

    /**
     * Takes a server parameter 'field' and calls getParameter to retrieve the current setting of
     * that server parameter.
     */
    function getParameter(field) {
        var q = {getParameter: 1};
        q[field] = 1;

        var ret = assert.commandWorked(db.getSiblingDB("admin").runCommand(q));
        return ret[field];
    }

    /**
     * Calls setParameter to set server parameter 'field' to 'value'.
     */
    function setParameter(field, value) {
        var cmd = {setParameter: 1};
        cmd[field] = value;
        assert.commandWorked(db.getSiblingDB("admin").runCommand(cmd));
    }

    function genLargeString(size) {
        var large = "";
        for (var i = 0; i < size; ++i) {
            large += listCharsString.charAt(Math.floor(Math.random() *
                                            (listCharsString.length - 1)));
        }
        return large;
    }

    /**
     * Set up a collection and populate it with large documents over a time of
     * maxTargetSnapshotHistoryWindowInSeconds to fully populate the snapshot window. And adjust
     * runtime parameters according to the control settings -- e.g. server_cache_size_gb.
     */
    function setup() {
        coll.drop();
        quiesceSystem();

        setParameter("wiredTigerEngineRuntimeConfig", "cache_size=" + server_cache_size_gb + "GB");

        // Give the system a little more time to respond to cache pressure before allowing a dynamic
        // window size decrease.
        setParameter("minMillisBetweenSnapshotWindowDec", 1000);

        // Choose whether to set a failpoint to prevent dynamic target snapshot window adjustments.
        if (static_window) {
            testDB.adminCommand({
                configureFailPoint: "preventDynamicSnapshotHistoryWindowTargetAdjustments",
                mode: "alwaysOn"
            });
        } else {
            testDB.adminCommand({
                configureFailPoint: "preventDynamicSnapshotHistoryWindowTargetAdjustments",
                mode: "off"
            });
        }

        // Perform document inserts over a period of time 'maxSnapshotWindowSizeInSeconds' such that
        // the snapshot window can be fully established with a range of timestamps.
        var maxSnapshotWindowSizeMilliseconds = maxSnapshotWindowSizeInSeconds * 1000;
        var intervalMillis = maxSnapshotWindowSizeMilliseconds / nb_docs;

        for (var i = 0; i < nb_docs; ++i) {
            var t1 = new Date();
            var bigString = genLargeString(largeStringLength);
            var doc = {"_id": i, "a": bigString};
            var res = coll.insert(doc);
            assert.writeOK(res);
            var t2 = new Date();
            if (intervalMillis > (t2 - t1)) {
                sleep(intervalMillis - (t2 - t1));
            }
        }
    }

    /**
     * Starts a background atClusterTime snapshot reads workload on the collection.
     */
    function startAtClusterTimeReads(nThreads) {
        return benchStart({
            ops: [{op: "find",
                   query: {_id: {$gte: {"#RAND_INT": [0, nb_docs - 1 - limit]}}},
                   ns: coll.getFullName(),
                   batchSize: batchSize,
                   limit: limit,
                   readCmd: true,
                   handleError: true,  // SnapshotTooOld errors
                   // Use an atClusterTime somewhere in the max available snapshot history range.
                   // The actual window size can be smaller if the server is under heavy write load.
                   useAClusterTimeWithinPastSeconds: maxSnapshotWindowSizeInSeconds,
                   maxRandomMillisecondDelayBeforeGetMore: (snapshot_held_open_for_seconds*1000)}],
            useSessions: true,
            useSnapshotReads: true,
            // this will help space out find requests and prevent the server from getting hammered
            // with requests if the window gets too small.
            delayMillisOnFailedOperation: (snapshot_held_open_for_seconds * 1000),
            host: server,
            parallel: nThreads,
            username: username,
            password: password,
        });
    }

    /**
     * Starts background update workload on the collection. These occur outside of the session and
     * any transaction.
     */
    function startUpdates(nThreads) {
        return benchStart({
            ops: [{op: "update",
                   query: {"_id": {"#RAND_INT": [ 0, nb_docs - 1 ]}},
                   update: {"$set": {"a": {"#RAND_STRING": [largeStringLength]} }},
                   writeCmd: true,
                   ns: coll.getFullName(),
                 }],
            host: server,
            parallel: nThreads,
            username: username,
            password: password
        });
    }

    // Don't quiesce the system after setup() because we set up the cache in setup().
    setup();

    print("Running '" + reader_threads + "' reader threads and '" + writer_threads +
          "' writer threads, with a " + (static_window ? "statically" : "dynamically") +
          " sized available snapshot history window and cache size of '" + server_cache_size_gb +
          "' GB.");

    print("Starting background atClusterTime readers...");
    // Starting reads first since they will impact the system less than writes.
    var backgroundAtClusterTimeReadLoad = startAtClusterTimeReads(reader_threads,
                                                                  maxSnapshotWindowSizeInSeconds);

    print("Starting background update writers...");
    var backgroundUpdatesLoad = startUpdates(writer_threads);

    sleep(test_duration_secs * 1000);

    print("Stopping background atClusterTime readers...");
    var readRes = benchFinish(backgroundAtClusterTimeReadLoad);

    print("Stopping background update writers...");
    var writeRes = benchFinish(backgroundUpdatesLoad);

    reportThroughput("atCluster_reads_per_sec",
                     readRes["totalOps/s"],
                     {nThread: reader_threads});
    reportThroughput("total_atCluster_reads_succeeded",
                     readRes["queries"].toNumber(),
                     {nThread: reader_threads});
    reportThroughput("total_atCluster_reads_failed",
                     readRes["errCount"].toNumber(),
                     {nThread: reader_threads});
    reportThroughput("updates_per_sec", writeRes["totalOps/s"], {nThread: writer_threads});
    reportThroughput("total_updates", writeRes["updates"].toNumber(), {nThread: writer_threads});
})();