/**
* @file
* Test performance of chunk migrations under load
* <br>
*
* ### *Test*
*
* Move one chunk, containing 25k documents, from one shard to another.
*
* While one chunk migrating, benchRun is used to apply a mixed load onto the documents in the
* non-migrating chunks.
*
* Throughput of the background load is also reported in metrics starting with "bg_". The goal
* of this test is basically for the migration speed to be as high as possible, while impacting
* the background load as little as possible.
*
* Unless user specifies otherwise, uses default value for
* {@link https://docs.mongodb.com/manual/core/sharding-balancer-administration/index.html#chunk-migration-and-replication|_secondaryThrottle}
* (false for WiredTiger and true for MMAPv1), and
* {@link https://docs.mongodb.com/manual/tutorial/manage-sharded-cluster-balancer/#wait-for-delete|_waitForDelete}
* is set to true.
*
* The test takes an optional 'thread_levels' parameters: an array specifying the number of threads to use for load.
*
* Uses
* {@link https://docs.mongodb.com/manual/core/sharding-balancer-administration/#chunk-migration-procedure|moveChunk}
* command
*
* Results are reported as docs moved / sec.
*
* ### *Setup*
*
* Requires a sharded cluster.
*
* Test inserts 100k documents, split evenly into one more chunk than there are shards. One chunk each is permanently moved to each of the
* shards. The last chunk is the one the test moves around. (Note that the test maxes out at five moves to avoid a large number of moves
* if the cluster has many shards.)
*
* Field a is indexed, containing uniformly random numbers.
*
* ### *Notes*
*
* The load that is here used only as background load was subsequently extracted into its own
* test in {@link module:workloads/mix}.
*
* ### *Owning-team*
* mongodb/sharding
*
* @module workloads/move_chunk_with_load
*/
/* global db sharded_cluster sh printjson assert reportThroughput sleep */
var default_val = "default";
/**
* The value for the _secondaryThrottle parameter to the moveChunk command.
*/
var secondary_throttle = secondary_throttle === undefined ? default_val : secondary_throttle;
/**
* The value for the _waitForDelete parameter to the moveChunk command.
*/
var wait_for_delete = wait_for_delete === undefined ? true : wait_for_delete;
/**
* The number of threads to run in parallel. The default is [0, 4, 64, 128, 256].
* **Note:** Thread count should be divisible by 4.
*/
var thread_levels = thread_levels || [0, 4, 64, 128, 256];
(function () {
if (!sharded_cluster()) {
print("move_chunk_with_load will only run in a sharded cluster.\nSkip...");
return;
}
sh.stopBalancer();
var dbName = "mctest";
var _db = db.getSiblingDB(dbName);
var config = _db.getSiblingDB("config");
var shards = config.shards.find().toArray();
var numShards = shards.length;
var shard_ids = [];
shards.forEach(function (shard) {
shard_ids.push(shard._id);
});
Random.setRandomSeed(0);
var chunkSize = numShards < 5 ? 25000 : 1000; // 25K docs per shard, 1K if the number of shards is large
var numberDoc = chunkSize * (numShards + 1); // one chunk remains on each shard and one extra is moved around
var longStr = "";
for (var i = 0; i < 100; i++) {
longStr += "a";
}
assert.commandWorked(_db.adminCommand({
enableSharding: _db.toString()
}));
_db.adminCommand({
movePrimary: _db.toString(),
to: shard_ids[0]
});
function initColl(coll) {
var shardConfig = {
shardCollection: coll.getFullName(),
key: {
id: 1
}
};
while (!_db.adminCommand(shardConfig).ok) {
// wait for collection shard to finish
}
// Create n + 1 chunks (n = number of shards)
for (i = 1; i <= numShards; i++) {
sh.splitAt(coll.getFullName(), {
id: chunkSize * i
});
}
// Leave 2 chunks on shard 0 and move one each to the remaining shards.
for (i = 1; i < numShards; i++) {
idToMove = (chunkSize * (i + 1));
assert.commandWorked(_db.adminCommand({
moveChunk: coll.getFullName(),
find: { id: idToMove },
to: shard_ids[i]
}));
}
// Insert a lot of documents evenly into the chunks.
var bulk = coll.initializeUnorderedBulkOp();
for (i = 0; i < numberDoc; i++) {
bulk.insert({
id: i,
a: Random.randInt(1000000), // indexed
c: longStr // not indexed
});
}
bulk.execute();
// Index a non-id key.
coll.createIndex({
a: 1
});
}
// Move the chunk with id: 0 from its shard to the destination shard.
function moveChunkTo(ns, toShard) {
var d1 = Date.now();
moveChunkCommand = {
moveChunk: ns,
find: { id: 0},
to: toShard
};
// Use default secondaryThrottle and waitForDelete unless it was explicitly specified in the config.
if (secondary_throttle !== default_val) {
moveChunkCommand["_secondaryThrottle"] = secondary_throttle;
}
moveChunkCommand["_waitForDelete"] = wait_for_delete;
var res = _db.adminCommand(moveChunkCommand);
var d2 = Date.now();
print("moveChunk result: " + tojson(res));
assert(res.ok);
return d2 - d1;
}
// Creates background load with BenchRun while the moveChunk is happening.
function startLoad(ns, nThreads) {
// We don't want to insert or remove from the chunk we're moving since that will change the
// number of documents to move. We don't want to update the chunk we're moving either
// because that makes the catchup phase never finish.
var minWriteId = chunkSize;
var benchRuns = [];
var batchSize = 250; // number of documents per vectored insert
var docSize = 100; // Document size + _id field
// Make a document of slightly more than the given size.
function makeDocument(docSize) {
for (var i = 0; i < docSize; i++) {
var doc = { id: minWriteId + Random.randInt(numberDoc - minWriteId),
a: Random.randInt(1000000),
c: "" };
while(Object.bsonsize(doc) < docSize) {
doc.c += "x";
}
return doc;
}
}
// Make the document array to insert
var docs = [];
for (var i = 0; i < batchSize; i++) {
docs.push(makeDocument(docSize));
}
var ops = [{ op: "insert",
writeCmd: true,
ns: ns,
doc: docs },
{ op: "remove",
writeCmd: true,
multi: true,
ns: ns,
query: { id: { "#RAND_INT" : [ minWriteId, numberDoc ]}}},
{ op: "update",
writeCmd: true,
ns: ns,
query: { id: { "#RAND_INT" : [ minWriteId, numberDoc ]}},
update: { $inc: { a: 1 },
$set: { c: { "#RAND_STRING": [ { "#RAND_INT": [ 1, 100 ]}]}}}},
{ op: "findOne",
readCmd: true,
ns: ns,
query: { id: { "#RAND_INT" : [ 0, numberDoc ]}}}];
ops.forEach(function(op) {
benchRuns.push(benchStart({ "ops": [op], "parallel": nThreads, "host": server, "username": username, "password": password }));
});
return benchRuns;
}
// Calls serverStatus with the appropriate arguments
function getServerStatus(db) {
return db.serverStatus({ rangeDeleter: 1,
metrics: 0,
tcmalloc: 0,
sharding: 0,
network: 0,
connections: 0,
asserts: 0,
extra_info: 0 });
}
// Collects the serverStatus every 2 seconds until the provided CountDownLatch hits 0.
function collectServerStatus(stopCounter) {
var output = [];
while (stopCounter.getCount() > 0) {
output.push(db.serverStatus({ rangeDeleter: 1,
metrics: 0,
tcmalloc: 0,
sharding: 0,
network: 0,
connections: 0,
asserts: 0,
extra_info: 0 }));
sleep(2000);
}
return output;
}
function getTestName() {
return "moveChunkWithLoad_secondaryThrottle_" + secondary_throttle + "_waitForDelete_" + wait_for_delete;
}
// Takes 2 server status responses and makes an object with relevant throughputs between them.
function getStatusDiffThroughput(status1, status2){
var secDiff = (status2.localTime - status1.localTime) / 1000;
var throughput = {
time: status2.localTime,
insert: (status2.opcounters.insert - status1.opcounters.insert) / secDiff,
query: (status2.opcounters.query - status1.opcounters.query) / secDiff,
update: (status2.opcounters.update - status1.opcounters.update) / secDiff,
delete: (status2.opcounters.delete - status1.opcounters.delete) / secDiff,
command: (status2.opcounters.command - status1.opcounters.command) / secDiff
};
return throughput;
}
/**
* Converts results of background load serverStatus monitoring into two forms.
* It creates a list of the throughputs at each time interval and then puts those into
* a map from operation type to a list of throughputs.
*/
function convertStatusResultsToThroughput(results) {
var throughputs = [];
var timeline = { time: [], insert: [], query: [], update: [], delete: [], command: [] };
for(var i = 1; i < results.length; i++) {
var throughput = getStatusDiffThroughput(results[i-1], results[i]);
throughputs.push(throughput);
timeline.time.push(throughput.time);
timeline.insert.push(throughput.insert);
timeline.query.push(throughput.query);
timeline.update.push(throughput.update);
timeline.delete.push(throughput.delete);
timeline.command.push(throughput.command);
}
// TODO: This log could be useful, but the parser currently does not handle it properly.
// When the parse is fixed, uncomment this line.
// print("Throughputs: " + tojsononeline(throughputs));
print("Timeline: " + tojson(timeline));
return timeline;
}
function findMedian(values, description) {
var median;
values.sort(function(a,b){return a-b;});
var half = Math.floor(values.length/2);
if(values.length % 2) {
median = values[half];
} else {
median = (values[half-1] + values[half]) / 2.0;
}
print(description + ": " + tojson(values));
return median;
}
function testMoveChunkWithLoad(collName, nThreads) {
var coll = _db[collName];
initColl(coll);
quiesceSystem();
sleep(500);
sh.status(); // check shard status before test run
var nLogThreads = nThreads * 4;
jsTest.log("Test moveChunk with\t " +
" secondaryThrottle=" + secondary_throttle +
" nThreads=" + nLogThreads);
var moveChunkEndCounter = new CountDownLatch(1);
var serverStatusThread = new ScopedThread(collectServerStatus, moveChunkEndCounter);
serverStatusThread.start();
var benchRunTasks;
if (nThreads > 0) {
print("Starting load... ");
benchRunTasks = startLoad(coll.getFullName(), nThreads);
}
// Let load start before collecting baseline.
sleep(2000);
// Record baseline throughput with load but no moveChunk.
var beforeBaseStatus = getServerStatus(db);
sleep(5000);
var afterBaseStatus = getServerStatus(db);
var bgBaseThroughput = getStatusDiffThroughput(beforeBaseStatus, afterBaseStatus);
var bgInsertThroughputs = [];
var bgQueryThroughputs = [];
var throughputs = [];
// The last move in this loop moves the chunk back to the first shard (rs0).
// We limit number of moves in the case that we have many shards.
maxMoves = numShards < 5 ? numShards : 5;
for(var i = 1; i <= maxMoves; i++) {
toShard = shard_ids[i % maxMoves];
// Let the system settle after the moveChunk, logging the background throughput.
var beforeBetweenStatus = getServerStatus(db);
sleep(6000);
var afterBetweenStatus = getServerStatus(db);
var bgBetweenThroughput = getStatusDiffThroughput(beforeBetweenStatus,
afterBetweenStatus);
jsTest.log("Background Throughput Before Migration: " + tojson(bgBetweenThroughput));
// Perform the actual moveChunk, recording background throughput before and after.
var beforeStatus = getServerStatus(db);
print("Starting moveChunk... ");
var moveChunkTime = moveChunkTo(coll.getFullName(), toShard);
var afterStatus = getServerStatus(db);
print("Ending moveChunk... ");
var bgThroughput = getStatusDiffThroughput(beforeStatus, afterStatus);
bgInsertThroughputs.push(bgThroughput.insert);
bgQueryThroughputs.push(bgThroughput.query);
throughputs.push(chunkSize * 1000 / moveChunkTime);
jsTest.log("moveChunk to shard " + i%maxMoves + " takes " + moveChunkTime +
" ms with\t secondaryThrottle=" + secondary_throttle +
" nThreads=" + nThreads * 4);
jsTest.log("Background Throughput During Chunk Migration: " + tojson(bgThroughput));
}
// Record throughput while load is still running but moveChunk is done.
print("Ending Chunk Migrations... ");
if (nThreads > 0) {
sleep(6000);
print("Ending benchRun... ");
benchRunTasks.forEach(function(benchRunTask) {
var benchResults = benchFinish(benchRunTask);
print("BenchRun Results: " + tojson(benchResults));
});
}
// Record throughput when load is finished and moveChunk is done.
sleep(4000);
moveChunkEndCounter.countDown();
serverStatusThread.join();
var serverStatusData = serverStatusThread.returnData();
convertStatusResultsToThroughput(serverStatusData);
reportThroughput(getTestName(),
findMedian(throughputs, "moveChunk throughputs"), {
nThread: nLogThreads,
});
// We report our own background throughput and don't rely on benchRun's own results
// because benchRun counts the throughput before and after the moveChunk as well.
reportThroughput("bg_baseline_insert_" + getTestName(),
bgBaseThroughput.insert, {
nThread: nLogThreads,
});
reportThroughput("bg_baseline_query_" + getTestName(),
bgBaseThroughput.query, {
nThread: nLogThreads,
});
reportThroughput("bg_insert_" + getTestName(),
findMedian(bgInsertThroughputs, "Background insert throughputs"), {
nThread: nLogThreads,
});
reportThroughput("bg_query_" + getTestName(),
findMedian(bgQueryThroughputs, "Background query throughputs"), {
nThread: nLogThreads,
});
coll.drop();
}
function increaseMigrationWaitTimeoutAndOrphanCleanupDelay() {
var configDB = db.getSiblingDB("config");
var shards = configDB.shards.find().sort({_id: 1}).toArray();
for (var i = 0; i < shards.length; i++) {
var shard = shards[i];
// Connect to each shard check binary version and increase the timeout.
if (authEnabled){
// Slice off the leading rs*/ from the list of members in the replica set
shardHostParse = shard.host.slice(4);
shardHostAuth = 'mongodb://'.concat(username, ':', password, '@', shardHostParse);
} else {
shardHostAuth = shard.host;
}
var mongo = new Mongo(shardHostAuth);
var shardDB = mongo.getDB("admin");
var buildInfo = shardDB.adminCommand({buildInfo: 1});
if (buildInfo.versionArray[0] >= 5 || (buildInfo.versionArray[0] === 4 && buildInfo.versionArray[1] === 4)) {
shardDB.adminCommand({setParameter: 1, receiveChunkWaitForRangeDeleterTimeoutMS: 3600000});
}
if (buildInfo.versionArray[0] >= 9 || (buildInfo.versionArray[0] === 8 && buildInfo.versionArray[1] >= 1)) {
shardDB.adminCommand({setParameter: 1, orphanCleanupDelaySecs: 900});
}
}
}
increaseMigrationWaitTimeoutAndOrphanCleanupDelay();
thread_levels.forEach(function(nThreads) {
var nThreadsNormalized = Math.floor(nThreads / 4);
testMoveChunkWithLoad("move_chunk_throttle_" + secondary_throttle + " " + nThreads, nThreadsNormalized);
});
})();