/**
* @file
* A simple test of basic Map/Reduce fuctionality
* <br>
*
* ### *Test*
*
* Execute an aggregation pipeline to sum all of the amounts grouped by uid. ~numJobs~ (default 200)
* output documents are generated.
* <br>
* The number of input documents equals the product of:
*
* numJobs * batches * batchSize * statusRange
*
* The default case is:
*
* 200 * 40 * 1000 * 5 = 40,000,000
*
* Results are reported as docs processed per second.
*
* ### *Setup*
*
* All variants (standalone, replica, sharded)
*
* ### *Notes*
*
* - This test stage will evenly distribute documents over 200 UID,
* the agg pipeline will calculate sum of amount based on uid.
*
* ### *Owning-team*
* mongodb/product-query
*
* @module workloads/map_reduce
*/
/* global db sharded_cluster shard_collections Random enableSharding shardCollection quiesceSystem */
/* global benchRun benchStart benchFinish sh print printjson assert */
/* global reportThroughput sleep server jsTest version emit */
/* global createJob runJobsInPool */
/**
* the destination database name.
*
*/
var db_name = "test";
/**
* The thread pool size to use generating the documents. The default is 32.
*
* The actual values in use are injected by run_workloads.py, which gets it from config file,
* see {@link https://github.com/10gen/dsi/blob/138bbc5a39ca779e5b49d8d9242515329ba9d978/configurations/test_control/test_control.core.yml#L29-L31|this hello world example}.
*/
var poolSize = poolSize || 32;
/**
* The range of uids to generate. The default 200.
*
* The actual values in use are injected by run_workloads.py, which gets it from config file,
* see {@link https://github.com/10gen/dsi/blob/138bbc5a39ca779e5b49d8d9242515329ba9d978/configurations/test_control/test_control.core.yml#L29-L31|this hello world example}.
*/
var numJobs = numJobs || 200;
/**
* The number of batches. The default is 40.
*
* The actual values in use are injected by run_workloads.py, which gets it from config file,
* see {@link https://github.com/10gen/dsi/blob/138bbc5a39ca779e5b49d8d9242515329ba9d978/configurations/test_control/test_control.core.yml#L29-L31|this hello world example}.
*/
var batches = batches || 40;
/**
* The range of status values to use when generating documents. It default to 5.
* So values 0 through 4 are generated in this case.
*
* The actual values in use are injected by run_workloads.py, which gets it from config file,
* see {@link https://github.com/10gen/dsi/blob/138bbc5a39ca779e5b49d8d9242515329ba9d978/configurations/test_control/test_control.core.yml#L29-L31|this hello world example}.
*/
var statusRange = statusRange || 5;
/**
* The {@link https://docs.mongodb.com/manual/reference/method/Bulk/#unordered-operations|unorderedBulkOp} batch size to use when generating the documents.
* The default is 1000.
*
* The actual values in use are injected by run_workloads.py, which gets it from config file,
* see {@link https://github.com/10gen/dsi/blob/138bbc5a39ca779e5b49d8d9242515329ba9d978/configurations/test_control/test_control.core.yml#L29-L31|this hello world example}.
*/
var batchSize = batchSize || 1000;
/**
* Create a range of documents for the map / reduce test.
*
* @param {string} db_name - The database name.
* @param {integer} batches - the number of batches to insert.
* @param {integer} batchSize - the number of documents per batch. Note: if this value is greater than 1000,
* then the bulk operator will transparently create batches of 1000.
* @param {integer} uid - the value of the uid field to insert.
* @param {integer} statusRange - the range of status values (zero based).
*
* @returns {object} a json document with the following fields:
* ok: if 1 then all the insert batches were successful and nInserted is the expected value
* nInserted: the number of documents inserted
* results: if ok is 1 then this field is an empty array, otherwise it contains all the batch results
*/
var staging_data = function(db_name, batches, batchSize, uid, statusRange) {
Random.srand(341215145 + this.threadId);
var d = db.getSiblingDB(db_name);
var nInserted =0;
var results = [];
var count =0;
for(var i=0; i < batches; i++) {
for(var j = 0; j < statusRange; j ++) {
var bulk = d.mr.initializeUnorderedBulkOp();
for(var k = 0; k < batchSize; k++){
// status would also equal the following but a count is simpler
// ((i * statusRange * batchSize) + (j * batchSize) + k + 1) % statusRange;
bulk.insert({
uid: uid,
amount: Random.randInt(1000000),
status: count++ % statusRange});
}
var r = bulk.execute( {w: 1});
nInserted += r.nInserted;
results.push(r);
}
}
var ok = results.every(function(r){return r.nInserted === batchSize;}) ? 1 : 0;
if (ok === 1 && nInserted !== (batches * batchSize * statusRange)) {
ok = 0;
}
if ( ok === 1){
results = [];
}
return {ok:ok, nInserted:nInserted, results: results };
};
var totalDocs = numJobs * batches * batchSize * statusRange;
print("inserting " + localize(totalDocs) + " docs in " + poolSize + " threads");
/**
* Create an array of jobs to insert the documents for the map reduce test. In this instance, the
* job paramter are fixed except for the uid. Each job generates a set of documents for a given uid
* in the desired range (0 to numJobs -1).
*
* @param {integer} numJobs - the range of uids to generate
* @param {function} func - the staging data function
* @param {string} db_name - the mr database name
* @param {string} batches - the number of batches to invoke.
* @param {string} batchSize - the size of a batch.
* @param {string} statusRange - the range of status values (0 to statusRange -1).
*
* @returns {array} returns an array of jobs that can be pased to {@link runJobsInPool|runJobsInPool}. A single
* job is an array containing the function to call as the first element of the array and the remaining elements of
* the array are the parameters to the function.
*/
var createJobs = function(staging_data, db_name, batches, batchSize, numJobs, statusRange){
return Array(numJobs).fill().map(function(v,uid){
return createJob(staging_data, db_name, batches, batchSize, uid, statusRange);
});
};
var jobs = createJobs(staging_data, db_name, batches, batchSize, numJobs, statusRange);
// drop database, and re-create index
var d = db.getSiblingDB(db_name);
d.dropDatabase();
d.mr.createIndex({uid: 1});
print("Server is : " + server);
if( sharded_cluster() ) {
enableSharding(d);
if (shard_collections) {
shardCollection(d, d.mr);
}
}
// staging data
var s = Date.now();
print("Start staging data: " + s + " of " + numJobs + " distinct uids");
var results = runJobsInPool(poolSize, jobs);
quiesceSystem();
var e = Date.now();
print("Done staging data: " + e + ". Total time taken: " + (e - s) + "ms");
var majorVersion = db.version().split('.')[0];
var minorVersion = db.version().split('.')[1];
// Helper to construct the test name. Note, we are using different test names here because
// on versions less than 4.6+ we are falling back to the legacy MR command since, many users
// on those versions still rely on the map reduce command.
function getTestName(outPolicy, majorVersion, minorVersion) {
var isLegacyMRVersion = (majorVersion == 4 && minorVersion < 5) || majorVersion < 4;
switch (outPolicy) {
case "inline":
return isLegacyMRVersion ? "map_reduce_doc_inline" : "agg_doc_inline";
case "reduce":
return isLegacyMRVersion ? "map_reduce_doc_reduce" : "agg_doc_reduce";
case "replace":
return isLegacyMRVersion ? "map_reduce_doc_replace" : "agg_doc_replace";
case "merge":
return isLegacyMRVersion ? "map_reduce_doc_merge" : "agg_doc_merge";
default:
throw new Error("invalid ouput policy: " + outPolicy);
}
}
// Set up test parameters
var outOptions = [
{
outPolicy: "inline",
out: {inline: 1},
testName: getTestName("inline", majorVersion, minorVersion),
},
{
outPolicy: "reduce",
out: {reduce: "totals", db: "mr_results"},
testName: getTestName("reduce", majorVersion, minorVersion),
},
{
outPolicy: "replace",
out: {replace: "totals", db: "mr_results"},
testName: getTestName("replace", majorVersion, minorVersion),
},
{
outPolicy: "merge",
out: {merge: "totals", db: "mr_results"},
testName: getTestName("merge", majorVersion, minorVersion),
}
];
// Helper to construct the out stage for each outPolicy.
function getLastPipelineStage(outPolicy, db, coll) {
switch (outPolicy) {
case "inline":
return null;
case "reduce":
return {"$merge":{"into": {db: db, coll: coll}, "on":"_id","let":{"new":"$$ROOT"},"whenMatched":[{"$project":{"value":{"$function":{"args":["$_id",["$value","$$new.value"]],"body":"function(k, v) {\n return Array.sum(v);\n }","lang":"js"}}}}],"whenNotMatched":"insert"}};
case "merge":
return {"$merge":{"into": {db: db, coll: coll}, "on":"_id","whenMatched":"replace","whenNotMatched":"insert"}};
case "replace":
return {"$out": {db: db, coll: coll}};
default:
throw new Error("invalid ouput policy: " + outPolicy);
}
}
// Actual test function
function runAggFold(outDoc, testName, outPolicy) {
// Flags for reporting results
var errMsg = "";
var pass = true;
// Run the legacy mapReduce command for db versions < 4.5, otherwise compute the workload
// using an agg pipeline.
if ((majorVersion == 4 && minorVersion < 5) || majorVersion < 4) {
// Create the target database manually, since mapReduce is no longer allowed to implicitly create
// the target db.
d.getSiblingDB("mr_results").createCollection("totals");
fmap = function() { emit( this.uid, this.amount ); };
freduce = function(k, v) { return Array.sum(v) ;};
query = { out: outDoc };
var start = Date.now();
result = d.mr.mapReduce(fmap, freduce, query);
var end = Date.now();
print("Done MR job! Total time taken: " + (end - start) + "ms");
// report results
if (result.ok != 1) { errMsg = "MapReduce Job return ok != 1"; }
throughput = totalDocs * 1000 / (end - start);
reportThroughput(testName, throughput,
{nThread: 1, errMsg: errMsg});
} else {
var pipelineDB = d.getSiblingDB("agg_results");
pipelineDB.totals.drop();
pipelineDB.createCollection("totals");
var outStage = getLastPipelineStage(outPolicy, "agg_results", "totals");
var aggPipeline = [{$project: {uid: 1, amount: 1}}, {$group: {_id: "$uid", value: {$sum: "$amount"}}}];
if (outPolicy != "inline") {
aggPipeline.concat(outStage);
}
var start = Date.now();
try {
d.mr.aggregate(aggPipeline);
} catch(e) {
// Set errMsg and failed flag in case of error
pass = false;
errMsg = "Agg pipeline Job return ok != 1";
}
var end = Date.now();
print("Done word count agg pipeline! Total time taken: " + (end - start) + "ms");
throughput = totalDocs * 1000 / (end - start);
reportThroughput(testName, throughput,
{nThread: 1, pass: pass, errMsg: errMsg});
}
}
outOptions.forEach(function(outOption, index) {
runAggFold(outOption["out"], outOption["testName"], outOption["outPolicy"]);
});