Source: workloads/agg_merge.js

/**
 * @file
 * Measure performance of aggregation's $merge stage against the
 * {@link https://bestbuyapis.github.io/api-documentation/#overview|BestBuy Developer API data}.
 *
 * ### *Pre-requisite*
 * The dataset must be installed on the target cluster before running the test. The data can be
 *  downloaded  from
 * {@link
 * https://s3-us-west-2.amazonaws.com/dsi-donot-remove/AggPerformance/bestbuyproducts.bson.gz|here}
 *  and installed using mongorestore (mongorestore --gzip --archive=bestbuyproducts.bson.gz)
 *
 * ### *Setup*
 * None
 *
 * ### *Test*
 *
 * The tests use a simple for loop to repeatedly run a specific pipeline ending in $merge, reporting
 * the throughput in documents processed per second.
 *
 * ### *Owning-team*
 * mongodb/product-query
 * 
 * @module workloads/agg_out
 */
(function() {
    "use strict";

    load("utils/exec_helpers.js");  // For 'ExecHelpers'.

    var serverVersion = db.version().split(".");
    if (serverVersion[0] < 4 || (serverVersion[0] == 4 && serverVersion[1] < 1)) {
        jsTestLog("New $merge features are not available on version " + tojson(serverVersion) +
                  ". Skipping test.");
        return;
    }

    var testDb = db.getSiblingDB(testDbName);
    var testColl = testDb[testCollName];

    // Function which repeatedly runs a single test configuration for a set duration, recording the
    // average latency across all executions.
    function runMerge(nDocsPerMerge, matchRatio, spec) {
        // If the combination of parameters guarantees that this test will always fail, skip it.
        if ((spec.whenMatched === "fail" && matchRatio > 0) ||
            (spec.whenNotMatched === "fail" && matchRatio < 1.0)) {
            return;
        }
        // Only run the single-document test once, and skip all subsequent variations.
        if (nDocsPerMerge == 1 && (matchRatio < 1 || spec.whenMatched !== "merge")) {
            return;
        }
        // Generate a name for this test which captures all the parameters used to execute it.
        var testName = "merge_into_" + spec.into.db + "." + spec.into.coll + "_" +
            (Array.isArray(spec.whenMatched) ? "pipeline" : spec.whenMatched) + "_" +
            spec.whenNotMatched + "_" + matchRatio + "_" + nDocsPerMerge;

        // The target collection is defined by the $merge spec's 'into' parameter.
        var targetColl = db.getSiblingDB(spec.into.db)[spec.into.coll];

        // Clears out the target collection, then copies (nDocsPerMerge*matchRatio) docs from the
        // source collection into the target collection. This ensures that the percentage of
        // documents which trigger the 'whenMatched' condition during the test reflects the
        // 'matchRatio' parameter.
        function populateTargetCollection(numTrialsSoFar) {
            // Drop and recreate the collection so that each time we run the workload we start with
            // a clean slate. We can't simply delete the contents of the collection after each
            // iteration, because that can lead to WT cache issues and excessively noisy results.
            ExecHelpers.resetCollection(targetColl);
            if (matchRatio === 0) {
                return true;
            }
            // In order to ensure predictable results, we need to have a sort order to ensure the
            // same documents make it through the $limit each time. We further have to filter out
            // those which have a null productId, since they will not make it through the $merge.
            assert.doesNotThrow(function() {
                testColl.aggregate([
                    {$match: {productId: {$ne: null}}},
                    {$sort: {_id: 1}},
                    {$limit: Math.round(nDocsPerMerge * matchRatio)},
                    {$merge: {into: spec.into, whenMatched: "fail", whenNotMatched: "insert"}}
                ]);
            });
        }

        // Run a single iteration of the test case. This function is called repeatedly by
        // ExecHelpers.measurePerformance to calculate an average latency for the test.
        function runTrial(numTrialsSoFar) {
            // In order to ensure predictable results, we need to have a sort order to ensure the
            // same documents make it through the $limit each time. We further have to filter out
            // those which have a null productId, since they will not make it through the $merge.
            var pipeline = [
                {$sort: {_id: 1}},
                {$match: {productId: {$ne: null}}},
                {$limit: nDocsPerMerge},
                {$addFields: {fromMerge: true}},
                {$merge: spec}
            ];

            // Pipelines ending with $merge should return 0 results.
            assert.eq(testColl.aggregate(pipeline).itcount(), 0);

            // Test that our $merge worked and produced the right number of documents.
            var totalDocsAfterMerge = targetColl.find({}, {_id: 1}).sort({_id: 1}).itcount();
            assert.eq(totalDocsAfterMerge,
                      nDocsPerMerge,
                      "Expected " + nDocsPerMerge + " documents in '" + targetColl.getFullName() +
                          "' after $merge, but found " + totalDocsAfterMerge);
        }

        // Run the current test configuration for 90 seconds. The 'populateTargetCollection' setup
        // function will be run before each trial but will not be included in the latency stats.
        // In the case of one single document per merge limit the run time to 45 seconds. This is
        // needed because in that case the overhead of setting up the sharded collection and
        // recreating its chunk distribution is much higher than the test's run time. This would
        // lead to very long overall execution times.
        var testSeconds = nDocsPerMerge == 1 ? 45 : 90;
        var perfInfo =
            ExecHelpers.measurePerformance(runTrial, testSeconds, populateTargetCollection);

        // Report the throughput in documents per second to allow easier comparisons between
        // pipelines. This is simply the ops per second multiplied by the number of documents we
        // expect inserted per $merge stage.
        reportThroughput(testName, nDocsPerMerge * perfInfo.meanThroughput, {nThread: 1});
    }

    // We cannot use $-prefixed fieldnames in DSI's configuration files. If 'whenMatched' is a
    // pipeline, then we must prefix the name of each stage in the pipeline with '$'.
    modes.forEach(function(mode) {
        if (Array.isArray(mode.whenMatched)) {
            mode.whenMatched.forEach(function(stageObj) {
                var stageName = Object.keys(stageObj)[0];
                var stageSpec = stageObj[stageName];
                stageObj["$" + stageName] = stageSpec;
                delete stageObj[stageName];
            });
        }
    });

    for (var n = 0; n < nDocsPerMerge.length; n++) {
        for (var r = 0; r < matchRatio.length; r++) {
            for (var m = 0; m < modes.length; m++) {
                for (var t = 0; t < targets.length; t++) {
                    if (!shard_collections() && targets[t] === "hashed_id") {
                        // On unsharded deployments there are no sharded collections, so skip this
                        // namespace which is meant to be identical to another test but with a
                        // different distribution.
                        continue;
                    }
                    runMerge(nDocsPerMerge[n], matchRatio[r], {
                        into: {db: targetDB, coll: targets[t]},
                        whenMatched: modes[m].whenMatched,
                        whenNotMatched: modes[m].whenNotMatched
                    });
                }
            }
        }
    }
}());