/**
* @file
*
* Util functions to be used as mongo shell rc file
*
* @module utils/mongoshell
*/
/**
* copying Thread, ScopedThread, and CountDownLatch from parallelTester.js since we can't
* import that file.
*/
if (typeof _threadInject != "undefined") {
// With --enableJavaScriptProtection functions are presented as Code objects.
// This function evals all the Code objects then calls the provided start function.
// arguments: [startFunction, startFunction args...]
function _threadStartWrapper() {
// Recursively evals all the Code objects present in arguments
// NOTE: This is a naive implementation that cannot handle cyclic objects.
function evalCodeArgs(arg) {
if (arg instanceof Code) {
return eval("(" + arg.code + ")");
} else if (arg !== null && isObject(arg)) {
var newArg = arg instanceof Array ? [] : {};
for (var prop in arg) {
if (arg.hasOwnProperty(prop)) {
newArg[prop] = evalCodeArgs(arg[prop]);
}
}
return newArg;
}
return arg;
}
var realStartFn;
var newArgs = [];
for (var i = 0, l = arguments.length; i < l; i++) {
newArgs.push(evalCodeArgs(arguments[i]));
}
realStartFn = newArgs.shift();
return realStartFn.apply(this, newArgs);
}
Thread = function() {
var args = Array.prototype.slice.call(arguments);
args.unshift(_threadStartWrapper);
this.init.apply(this, args);
};
_threadInject(Thread.prototype);
ScopedThread = function() {
var args = Array.prototype.slice.call(arguments);
args.unshift(_threadStartWrapper);
this.init.apply(this, args);
};
ScopedThread.prototype = new Thread(function() {});
_scopedThreadInject(ScopedThread.prototype);
}
/**
* To use a CountDownLatch, first create one in the parent thread and then pass it into
* a ScopedThread as a function argument:
* var latch = new CountDownLatch(1);
* var t = new ScopedThread(funcA, latch);
*
* You may then lower the count of the latch by calling latch.countDown() and get the value
* of the latch with latch.getCount() from any thread.
*/
if ( typeof CountDownLatch !== 'undefined' ) {
CountDownLatch = Object.extend(function(count) {
if (! (this instanceof CountDownLatch)) {
return new CountDownLatch(count);
}
this._descriptor = CountDownLatch._new.apply(null, arguments);
// NOTE: The following methods have to be defined on the instance itself,
// and not on its prototype. This is because properties on the
// prototype are lost during the serialization to BSON that occurs
// when passing data to a child thread.
this.await = function() {
CountDownLatch._await(this._descriptor);
};
this.countDown = function() {
CountDownLatch._countDown(this._descriptor);
};
this.getCount = function() {
return CountDownLatch._getCount(this._descriptor);
};
}, CountDownLatch);
}
/**
* Helper function to run a workload in parallel.
*
* Input:
* numThread: number of threads.
* workload: a function that runs a workload.
*
* Output:
* returns an array of run times for each thread
*/
var runWorkloadInParallel = function(numThread, workload) {
var threads = [];
var durationsMs = [];
var timeWorkload = function(workload) {
var start = Date.now();
workload();
var end = Date.now();
return end - start;
};
for (i = 0; i < numThread; i++) {
var t = new ScopedThread(timeWorkload, workload);
threads.push(t);
t.start();
}
threads.forEach(function(t) { durationsMs.push(t.returnData()); });
var min = Math.min.apply(null, durationsMs);
var max = Math.max.apply(null, durationsMs);
if ((max - min) / min > 0.05) {
print("WARNING: run times are inconsistent: ");
printjson(durationsMs);
}
return durationsMs;
};
/**
* Much simplified version of {@link https://developer.mozilla.org/en/docs/Web/JavaScript/Reference/Global_Objects/Number/toLocaleString|toLocaleString}.
* It only support US / EN format.
*
* @param {integer} number - a numeric object (it is converted with toString before being processed).
* @param {string} [separator=,] - the separator to use between strings of digits (defaults to ',').
*
* @returns {string} a language sensitive representation of this number.
* For example, 1000 will result in the string "1,000"
*/
var localize = function(number, separator) {
if (typeof separator === "undefined" ) {
separator=",";
}
return number.toString().replace(/\B(?=(\d{3})+\b)/g, separator);
};
/*
* Split jobs amongst a pool of threads.
* Note: The 'this' pointer for the function will contain a unique (per pool)
* threadId value. This can, for example, be used to seed the Random number
* generator for each thread (to get a deterministic but different random number
* stream for each thread).
*
* Input:
* poolSize : a fixed number of threads to evaluate the jobs
* jobs : an array of jobs, each jobs is itself an array of
* - the function to call
* - the parameters to the function
* Return: a json object with the following fields
* ok: 1 iff all jobs returned ok :1
* duration: the amount of time in millis to run all the jobs
* results: an array of the return values from each of the jobs. In the
* event of an exception, the document is
* {ok: 0, exception: exception instance}
*
*/
var runJobsInPool = function(poolSize, jobs) {
var pool = [];
var durationsMs = [];
var chunk = Math.ceil(jobs.length / poolSize);
var runner = function(id, jobs) {
var start = Date.now();
var results = jobs.map(function(job){
try {
var worker = job.shift();
return worker.apply({threadId:id}, job);
} catch(ex) {
print(ex);
print("Unexpected exception evaluating " + tojson(job));
return {ok:0, exception:ex};
}
});
var end = Date.now();
var duration = end - start;
var ok = results.every(function(r){return r.hasOwnProperty('ok') && r.ok == 1;}) ? 1: 0;
var r = {ok: ok,
duration: duration,
results: results};
// printjson(r);
return r;
};
while(jobs.length) {
var t = new ScopedThread(runner, pool.length, jobs.splice(0,chunk));
pool.push(t);
t.start();
}
return pool.map(function(t) { return t.returnData(); });
};
/**
* Convert a function call into a job. The first parameter to createJob is the function to call.
* The remaining arguments are the parameters to the function. For example:
* var job = createJob(print, "hello ", "world")
* var job = createJob(sh.status,true)
*
* An arbitrary number of parameters can be passed, so the following example is also valid:
* var job = createJob(print, "hello"," ", "world", " ", ", this is a longer example")
*
* @param {function} func - the function to be called as a job in a thread.
* @param {any} arg1 - the first argument.
* @param {any} arg2 - the second argument.
*
* @returns {array} the job to be invoked.
*/
var createJob = function(func, arg1, arg2){
return Array.prototype.slice.call(arguments);
};
var __results = [];
/*
* To report results in throughput
*
* Input:
* test_name : string
* throughput : float
* result : JS object has following fields
* - nThread : thread count
* - pass : bool
* - errMsg : error message if pass == false
* trial : trial run true|false
*/
var reportThroughput = function(test_name, throughput, result, trial) {
// this is message for MC to parse
var msg = ">>> " + test_name + " : " + throughput;
if ( typeof(result) !== undefined && result !== null ) {
if ( "nThread" in result ) {
msg = msg + " " + result.nThread;
} else {
msg = msg + " NA";
}
if ( "pass" in result && result.pass === false ) {
msg = msg + " failed: " + result.errMsg;
// FIXME: need check to make sure errMsg is present.. and properly error out
} else {
// ignore pass case
}
} else {
result = null; // just set it to null
}
if ( ! trial ) {
print(msg);
}
// also keep the results for final pretty printed table
__results.push({test_name: test_name, throughput: throughput, result: result});
};
/**
* Check if isSharded is set.
*
* @returns true if isSharded is defined *and* true
*/
var sharded_cluster = function() {
if ( typeof isShardedCluster !== 'undefined' ) {
if ( isShardedCluster ) {
return true;
}
}
return false;
};
/**
* Check if isShardCollections is set.
*
* @returns true if isShardCollections is defined *and* true
*/
var shard_collections = function() {
if ( typeof isShardCollections !== 'undefined' ) {
if ( isShardCollections ) {
return true;
}
}
return false;
};
/**
* Check if isRepl is set.
*
* @returns true if isRepl is defined *and* true
*/
var isReplSet = function() {
if ( typeof isRepl !== 'undefined' ) {
if ( isRepl ) {
return true;
}
}
return false;
};
// Helpers to initialize sharding
function isDatabaseSharded(d) {
config = d.getSiblingDB("config");
count = config.databases.find( { "_id" : d.getName(), "partitioned" : true } ).count();
if ( count > 0 ) {
return true;
}
else {
return false;
}
}
// Call sh.enableSharding().
// If the db is already sharded, do nothing.
function enableSharding(d) {
if ( isDatabaseSharded(d) ) {
return;
}
assert.commandWorked(d.adminCommand({
enableSharding: d.toString()
}));
}
function shardCollection(d, coll, opts) {
opts = opts || {};
opts.shardKey = opts.shardKey || "_id";
opts.shardKeyType = opts.shardKeyType || "hashed";
print("Sharding collection " + coll.getFullName());
var shardConfig = {
shardCollection: coll.getFullName(),
key: {} // placeholder, set from opts.shardKey below
};
shardConfig['key'][opts.shardKey] = opts.shardKeyType;
var secondsWaited = 0;
while (!d.adminCommand(shardConfig).ok) {
print("Waiting for shardCollection to finish...");
sleep(1000);
secondsWaited++;
assert( secondsWaited < 20, "shardCollection didn't succeed in 20 seconds." );
}
sh.status();
// Note: If shardKeyType is something else than hashed, all chunks are now on the primary shard
// and the balancer is probably off too. We should add a function to move them around.
// It seems for now, all tests use hashed shard keys anyways.
}
/**
* waitForStates blocks until all nodes in a replset reach one of the desired states or a time
* limit is reached. This function calls
* {@link https://docs.mongodb.com/manual/reference/command/replSetGetStatus/|replSetGetStatus}.
*
* Errors / Exceptions are ignored, the function will continue to wait. This case covers the
* scenario where the process represented by _db has not yet been added to the replica set.
*
* This function can be used to wait for the primary to discover that all other
* members of the replica set are in the correct state. For example,
* [workloads/initialsync.js]{@link module:workloads/initialsync}
* uses this method to block at the end of the test. Otherwise the final tests may fail.
*
* @param {object} _db - A database reference to a replica set member.
* @param {int} [time_limit_millis=3600000] - The minimum amount of time to wait. The floor
* value is 1.1 seconds.
* @param {...string} [states=["PRIMARY", "SECONDARY", "ARBITER"]] - Zero or more states.
* @returns the amount of time the function waited in millis.
*/
function waitForStates(_db, time_limit_millis, ...states) {
var done = false;
var start_time = Date.now();
time_limit_millis = time_limit_millis || 60 * 60 * 1000;
if ( time_limit_millis < 1000 ) {
// use 1100 so that the sleep(1000) allows for more than one check
time_limit_millis = 1100;
}
if ( states.length === 0 ) {
states = ["PRIMARY", "SECONDARY", "ARBITER"];
}
print("waitForStates: " + tojson(states));
print("waitForStates: start " + tojson(rs.status()));
var status;
while ( !done && Date.now() - start_time < time_limit_millis ) {
try{
status = assert.commandWorked(_db.adminCommand({replSetGetStatus: 1}));
done = status.members.every(function(member){
return states.includes(member.stateStr);
});
}
catch(err) {
print("replSetGetStatus error with [" +
err +
"]. This is informational only.");
}
if( !done) {
sleep(1000);
}
}
assert(done, "expected to reach done state in time limit : " + tojson(status));
return Date.now() - start_time;
}
/**
* waitOplog wait for secondary to catch up with the primary. This function calls
* {@link https://docs.mongodb.com/manual/reference/command/replSetGetStatus/|replSetGetStatus}.
* If there are no members returned then this function will return without waiting. It
* is likely that this case will cover the scenario where the process represented by
* replSetDB has not yet been added to the replica set.
*
* @param {object} [replSetDB=db] - a database reference to a replica set member.
*/
function waitOplog(replSetDB) {
if (typeof replSetDB === 'undefined') {
replSetDB = db;
}
var members = replSetDB.adminCommand("replSetGetStatus").members;
var opDB = replSetDB.getSiblingDB("Op");
var c = opDB.wait_oplog;
// Only do the check if repl set
if (members) {
// first check to make sure all secondaries are in normal operating mode
for (var j = 0; j < members.length; j++) {
var i = members[j];
if ( i.stateStr != "PRIMARY" && i.stateStr != "SECONDARY" && i.stateStr != "ARBITER" ) {
// neither PRIMARY or SECONDARY, fail this
print("ERROR: replSet member " + i.name + " state is: " + i.stateStr);
return false;
}
}
if (!replSetDB.isMaster().ismaster) {
print("ERROR: Not connected to the primary");
return false;
}
// print out to get lags
replSetDB.printSlaveReplicationInfo();
print("Before insert with w:all");
r = c.insert({"x":"flush"}, {writeConcern : { w:members.length, j: true}});
print("After insert with w:all. nInserted=" + r.nInserted);
printjson(r);
replSetDB.printSlaveReplicationInfo();
return r.nInserted === 1;
}
return true;
}
/**
* waitOplogSharded wait for each shard to catch up with the primary.
*
* @param {object} [targetDB=db] - a database reference to a mongos process.
*/
function waitOplogSharded(targetDB) {
if (typeof targetDB === 'undefined') {
targetDB = db;
}
var configDB = targetDB.getSisterDB("config");
var shards = configDB.shards.find().sort({_id: 1}).toArray();
for (var i = 0; i < shards.length; i++) {
var shard = shards[i];
// Connect to each shard and do a w:all write
var mongo = new Mongo(shard.host);
var shardDB = mongo.getDB("admin");
print("Running waitOplog on shard: " + tojson(shard));
if (!waitOplog(shardDB)) {
print("Failed to run waitOplog on shard: " + tojson(shard));
return false;
}
}
return true;
}
/**
* Helper function to quiesce the system between tests. *targetDB* can be
* a standalone mongod, a replica set member or a mongos process. The appropriate
* actions will be performed based on the values returned by sharded_cluster() and isReplSet().
* @see sharded_cluster
* @see isReplSet
*
* @param {object} [targetDB=db] - a database reference to a mongo process.
*/
var quiesceSystem = function(targetDB) {
if (typeof targetDB === 'undefined') {
targetDB = db;
}
if(!waitOplogCheck(targetDB)) {
print("ERROR: waitOplogCheck() failed in quiesceSystem(). Returning false and not running doFSync().");
return false;
}
// Check for any migrations or deletes.
doFSync(targetDB);
return true;
};
/**
* Helper function to flush to disk. *targetDB* can be
* a standalone mongod, a replica set member or a mongos process. The appropriate
* actions will be performed based on the values returned by
* sharded_cluster() and isReplSet().
*
* @see sharded_cluster
* @see isReplSet
* @param {object} [targetDB=db] - a database reference to a mongo process.
*/
var doFSync = function(targetDB) {
if (typeof targetDB === 'undefined') {
targetDB = db;
}
if (sharded_cluster()){
doFSyncSharded(targetDB);
}
else if (isReplSet()){
doFSyncReplicaSet(targetDB);
}
else {
targetDB.adminCommand({fsync: 1});
}
};
/**
* For a sharded cluster, iterate through all the data bearing hosts and call fsync
* on each of them.
*
* @param {object} [targetDB=db] - the database reference to a mongos process.
*/
var doFSyncSharded = function(targetDB) {
if (typeof targetDB === 'undefined') {
targetDB = db;
}
var configDB = targetDB.getSisterDB("config");
// The sort is copied from waitOplogSharded. Not sure it's
// necessary, but it will go in the same order each time.
var shards = configDB.shards.find().sort({_id: 1}).toArray();
for (var i = 0; i < shards.length; i++) {
var shard = shards[i];
// Connect to each shard, and fsync the replica set
var mongo = new Mongo(shard.host);
var shardDB = mongo.getDB("admin");
doFSyncReplicaSet(shardDB);
}
var map = targetDB.adminCommand({"getShardMap": 1});
var connector = map.map.config;
var mongo = new Mongo(connector);
var cfgDB = mongo.getDB("admin");
doFSyncReplicaSet(cfgDB);
};
/**
* Perform an fsync on each member of a replica set if the replset
* was initiated.
* If replSetDB is not yet initialized then it just fsync's that node
* Otherwise raise an exception.
*
* @param {object} [replSetDB=db] - the database reference to a replica set member.
*/
var doFSyncReplicaSet = function(replSetDB) {
if (typeof replSetDB === 'undefined') {
replSetDB = db;
}
var status = replSetDB.adminCommand("replSetGetStatus");
if (!status.ok) {
print("WARNING: doFSyncReplicaSet failed, replSetGetStatus[" + replSetDB.getMongo()
+"] status=" + tojson(status));
}
var members = status.members;
// Only do the check if repl set
if (members) {
for (var i = 0; i < members.length; i++) {
// Connect to member and do fsync
var member = members[i];
var mongo = new Mongo(member.name);
var adminDB = mongo.getDB("admin");
adminDB.runCommand({fsync: 1});
}
}
};
/**
* wait for the oplog to catch up. On Error this function will perform a single retry. This
* function will call waitOplogSharded() or waitOplog() based on the return values of sharded_cluster() and
* isReplSet()
*
* @see waitOplogSharded
* @see waitOplog
* @see sharded_cluster
* @see isReplSet
* @param {object} [replSetDB=db] - the database reference to a mongo process.
*/
function waitOplogCheck(targetDB) {
if (typeof targetDB === 'undefined') {
targetDB = db;
}
// Connection is closed after some long running tests (large_initialsync.js), so we wrap this
// in a try-catch-retry block.
try {
return waitOplogCheck_real(targetDB);
} catch (err) {
print(err);
print("waitOplogCheck() failed. Retrying one more time...");
return waitOplogCheck_real(targetDB);
}
}
function waitOplogCheck_real(targetDB) {
if (typeof targetDB === 'undefined') {
targetDB = db;
}
if (sharded_cluster()) {
return waitOplogSharded(targetDB);
} else if (isReplSet()) {
return waitOplog(targetDB);
} else {
return true;
}
}
/*
* Get a shard and make it the primary shard
*
* Preconditions:
* This is run against a mongos in a sharded environment
*/
var findPrimaryShard = function(db_name) {
var configdb = db.getSisterDB("config");
var primary_name = configdb.databases.findOne({"_id": db_name}).primary;
return configdb.shards.findOne({"_id": primary_name}).host;
};
/*
* Convert numbers into human readable form.
*/
var humanReadableNumber = function(n){
if( n / 1000000000 >= 1 )
return Math.round( n / 1000000000 ) + "B";
else if ( n / 1000000 >= 1)
return Math.round( n / 1000000 ) + "M";
else if ( n / 1000 >= 1)
return Math.round( n / 1000 ) + "K";
else
return n;
};
// Helper function to deal with the fact that undefined in JS is NaN, not 0.
function toNumber(n) {
return isNaN(n) ? 0 : Number(n);
}
// from <mongodb>/jstests/libs/check_log.js
/*
* Helper functions which connect to a server, and check its logs for particular strings.
*/
var checkLog;
(function() {
"use strict";
if (checkLog) {
return; // Protect against this file being double-loaded.
}
checkLog = (function() {
var getGlobalLog = function(conn) {
var cmdRes;
try {
cmdRes = conn.adminCommand({getLog: 'global'});
} catch (e) {
// Retry with network errors.
print("checkLog ignoring failure: " + e);
return null;
}
return assert.commandWorked(cmdRes).log;
};
/*
* Calls the 'getLog' function at regular intervals on the provided connection 'conn' until
* the provided 'msg' is found in the logs, or 5 minutes have elapsed. Throws an exception
* on timeout.
*/
var contains = function(conn, msg) {
assert.soon(
function() {
var logMessages = getGlobalLog(conn);
if (logMessages === null) {
return false;
}
for (var i = 0; i < logMessages.length; i++) {
if (logMessages[i].indexOf(msg) != -1) {
return true;
}
}
return false;
},
'Could not find log entries containing the following message: ' + msg,
5 * 60 * 1000,
300);
};
/*
* Calls the 'getLog' function at regular intervals on the provided connection 'conn' until
* the provided 'msg' is found in the logs exactly 'expectedCount' times, or 5 minutes have
* elapsed.
* Throws an exception on timeout.
*/
var containsWithCount = function(conn, msg, expectedCount) {
var count = 0;
assert.soon(
function() {
var logMessages = getGlobalLog(conn);
if (logMessages === null) {
return false;
}
for (var i = 0; i < logMessages.length; i++) {
if (logMessages[i].indexOf(msg) != -1) {
count++;
}
}
return expectedCount === count;
},
'Expected ' + expectedCount + ', but instead saw ' + count +
' log entries containing the following message: ' + msg,
5 * 60 * 1000,
300);
};
return {
getGlobalLog: getGlobalLog,
contains: contains,
containsWithCount: containsWithCount
};
})();
})();
// from <mongodb>/jstests/libs/write_concern_utils.js
// Stops replication on the given server(s).
function stopServerReplication(conn) {
if (conn.length) {
conn.forEach(function(n) {
stopServerReplication(n);
});
return;
}
print("stopServerReplication called for " + conn);
// Clear ramlog so checkLog can't find log messages from previous times this fail point was
// enabled.
assert.commandWorked(conn.adminCommand({clearLog: 'global'}));
var errMsg = 'Failed to enable stopReplProducer failpoint.';
assert.commandWorked(
conn.adminCommand({configureFailPoint: 'stopReplProducer', mode: 'alwaysOn'}), errMsg);
print("stopServerReplication wait for " + conn);
// Wait until the fail point is actually hit.
checkLog.contains(conn, 'bgsync - stopReplProducer fail point enabled');
}
/**
* Runs a command in the bash shell.
*/
var runCmd = function(cmd) {
runProgram('bash', '-c', cmd);
};
/**
* Runs a list of semicolon separated bash commands on the specified host.
*/
var runCmdOnTarget = function(host, cmds) {
hostSsh = 'ssh -A -o StrictHostKeyChecking=no ' + host;
runCmd(hostSsh + ' "' + cmds + '"');
};
/**
* Returns a string of format <host_addr>:<port>
*/
var hostWithPort = function(host_addr, port) {
return host_addr + ':' + port;
};
/**
* Attempt to establish a new Mongo connection to mongod instance running on the specified host,
* with multiple retries.
*
* Returns a reference to admin Database, upon successful connnection. If connection cannot be
* established after 2 retries, throws an exception.
*
*/
var getAdminDB = function(host, retries) {
var retries = retries || 2;
var admin_db;
assert.retry(function() {
try {
var conn = new Mongo(host);
admin_db = conn.getDB("admin");
return true;
} catch (e) {
print(e);
return false;
}
}, "Error Connecting to admin database on host " + retries, retries);
return admin_db;
};
var keepAliveFn = function(intervalMillis, keepAliveEndCounter, printMsgFnCbk, ...cbkArgs) {
while (keepAliveEndCounter.getCount() > 0) {
sleep(intervalMillis);
printMsgFnCbk(...cbkArgs);
}
};
/**
* Starts a keep-alive thread which runs "printMsgFnCbk" callback function with the provided cbkArgs,
* for every specified intervalMillis.
*/
var keepAliveStartifNeeded = function(keepAliveNeeded, printMsgFnCbk, intervalMillis, ...cbkArgs) {
var keepAliveEndCounter = null;
var keepAliveThread = null;
if (!(keepAliveNeeded && typeof printMsgFnCbk === "function")) {
return [keepAliveEndCounter,keepAliveThread];
}
var intervalMillis = intervalMillis || 30 * 60 * 1000;
keepAliveEndCounter = new CountDownLatch(1);
keepAliveThread =
new ScopedThread(keepAliveFn, intervalMillis, keepAliveEndCounter, printMsgFnCbk, ...cbkArgs);
keepAliveThread.start();
return [keepAliveThread, keepAliveEndCounter];
};
/**
* Stops the keep-alive thread.
*/
var keepAliveStop = function(keepAliveThread, keepAliveEndCounter) {
if (keepAliveThread) {
keepAliveEndCounter.countDown();
keepAliveThread.join();
}
};