/**
* @file
* Test performance of each section of chunk migrations under load
* <br>
*
* ### *Test*
*
* Move one chunk, containing 25k documents, from one shard to another.
*
* While one chunk migrating, benchRun is used to apply a mixed load onto the documents in the
* migrating chunk.
*
* The goal of this test is to determine the time spent in each section of chunk
* migration as well as the impact of different sections of chunk migration on other
* commands.
*
* Unless user specifies otherwise, uses default value for
* {@link
* https://docs.mongodb.com/manual/core/sharding-balancer-administration/index.html#chunk-migration-and-replication|_secondaryThrottle}
* (false for WiredTiger and true for MMAPv1)
*
* The test takes an optional 'threadLevels' parameters: an array specifying the number of
* threads to use for load.
*
* Uses
* {@link
* https://docs.mongodb.com/manual/core/sharding-balancer-administration/#chunk-migration-procedure|moveChunk}
* command
*
* Results are reported by dsi/dsi/libanalysis/move_chunk_log_analysis.
*
* ### *Setup*
*
* Requires a sharded cluster.
*
* Test inserts 25k documents in the first chunk (that is moved). There are also 500,000 empty
* chunks. The chunk is moved 5 times per thread level.
*
* Field a is indexed, containing uniformly random numbers.
*
* ### *Owning-team*
* mongodb/sharding
*
* @module workloads/move_chunk_large_chunk_map
*/
/* global db sharded_cluster sh printjson assert reportThroughput sleep */
/**
* The _secondaryThrottle parameter to the moveChunk command.
*/
var secondaryThrottle;
/**
* The number of threads to run in parallel. The default is [0, 2, 4].
* **Note:** Thread count should be divisible by 2, and this test cannot handle thread levels
* greater than 4. Higher thread levels risk exceeding the 500MB memory limit for the transfer mods
* queue and causing the chunk migration to fail.
*/
var threadLevels = threadLevels || [0, 2, 4];
/**
* The number of chunks to create. The default is 500,000.
*/
var numChunks = numChunks || 500000;
/**
* The major version of the server required to infer the FCV in use.
*/
var majorVersion = db.version().split(".")[0];
(function() {
if (!sharded_cluster()) {
print("move_chunk_large_chunk_map will only run in a sharded cluster.\nSkip...");
return;
}
assert(Array.isArray(threadLevels), "threadLevels variable must be an array");
threadLevels.forEach(function(nThreads) {
assert.eq(0, nThreads % 2, "Thread level must be divisible by 2, but got " + tojson(nThreads));
});
sh.stopBalancer();
var dbName = "mctest";
var _db = db.getSiblingDB(dbName);
var config = _db.getSiblingDB("config");
var numDocs = 25000;
var shards = config.shards.find().toArray();
var numShards = shards.length;
var shardIds = [];
shards.forEach(function(shard) {
shardIds.push(shard._id);
});
/**
* Create numChunks chunk entries to imitate calling the splitChunk command numChunks times. Each
* chunk has an _id range of 1 starting at numDocs.
*/
function initChunks(ns, numChunks) {
var collUUID = config.collections.findOne({_id: ns}).uuid;
var chunksUpdateQuery = majorVersion < 5 ? {ns: ns, min: {_id: MinKey}, max: {_id: MaxKey}} : {uuid: collUUID, min: {_id: MinKey}, max: {_id: MaxKey}};
res = assert.commandWorked(config.chunks.update(chunksUpdateQuery, {$set: {max: {_id: numDocs}}}));
var chunksFindQuery = majorVersion < 5 ? {ns: ns, min: {_id: MinKey}, max: {_id: numDocs}} : {uuid: collUUID, min: {_id: MinKey}, max: {_id: numDocs}};
var chunk = config.chunks.findOne(chunksFindQuery);
var bulkChunks = config.chunks.initializeUnorderedBulkOp();
for (var i = 0; i < numChunks; i++) {
var minKey = i + numDocs;
var maxKey = (i === numChunks - 1) ? MaxKey : minKey + 1;
var chunkEntry = majorVersion < 5 ? {
"ns": ns,
"min": {"_id": minKey},
"max": {"_id": maxKey},
"shard": chunk.shard,
"lastmod": Timestamp(minKey, 0),
"lastmodEpoch": chunk.lastmodEpoch,
"history": [{"validAfter": Timestamp(0, 0), "shard": chunk.shard}]
} : {
"uuid": collUUID,
"min": {"_id": minKey},
"max": {"_id": maxKey},
"shard": chunk.shard,
"lastmod": Timestamp(minKey, 0),
"history": [{"validAfter": Timestamp(0, 0), "shard": chunk.shard}]
};
bulkChunks.insert(chunkEntry);
}
assert.commandWorked(bulkChunks.execute());
}
/**
* Add documents to the chunk that will be used.
*/
function initColl(coll) {
var shardConfig = {shardCollection: coll.getFullName(), key: {_id: 1}};
while (!_db.adminCommand(shardConfig).ok) {
// wait for collection shard to finish
}
// Insert a lot of documents into the chunk to be moved.
Random.setRandomSeed(0);
var longStr = "a".repeat(100);
var bulk = coll.initializeUnorderedBulkOp();
for (var i = 0; i < numDocs; i++) {
bulk.insert({
_id: i,
a: Random.randInt(1000000), // indexed
c: longStr // not indexed
});
}
assert.commandWorked(bulk.execute());
// Index a non-_id key.
assert.commandWorked(coll.createIndex({a: 1}));
}
/**
* Move the chunk with _id: 0 from its shard to the destination shard.
*/
function moveChunkTo(ns, toShard) {
var d1 = Date.now();
moveChunkCommand = {moveChunk: ns, find: {_id: 0}, to: toShard, _waitForDelete: true};
// Use default secondaryThrottle unless it was explicitly specified in the config.
if (secondaryThrottle !== undefined) {
moveChunkCommand["_secondaryThrottle"] = secondaryThrottle;
}
var res = assert.commandWorked(_db.adminCommand(moveChunkCommand));
var d2 = Date.now();
print("moveChunk result: " + tojson(res));
return d2 - d1;
}
/**
* Creates background load with BenchRun while the moveChunk is happening.
*/
function startLoad(ns, nThreads) {
// Only test updates and finds since insert and remove would change
// the number of documents being moved in the chunk migration.
// All docs are on the migrating chunk, so all writes will go to that
// chunk.
var benchRuns = [];
var ops = [
{
op: "update",
writeCmd: true,
ns: ns,
query: {_id: {"#RAND_INT": [0, numDocs]}},
update: {$inc: {a: 1}, $set: {c: {"#RAND_STRING": [{"#RAND_INT": [1, 100]}]}}}
},
{op: "findOne", readCmd: true, ns: ns, query: {_id: {"#RAND_INT": [0, numDocs]}}}
];
ops.forEach(function(op) {
benchRuns.push(benchStart({"ops": [op], "parallel": nThreads, "host": server, "username": username, "password": password}));
});
return benchRuns;
}
/**
* MigrationPerf logs are debug, so need to explicitly enable them.
*/
function updateVerbosityLevels() {
var cmd = {setParameter: 1, logComponentVerbosity: {sharding: {migrationPerf: 2}}};
assert(_db.getMongo().isMongos() === true);
assert.commandWorked(_db.adminCommand({multicast: cmd}));
assert.commandWorked(_db.adminCommand(cmd));
}
/**
* Test moving chunk with nThreads running the background load.
*/
function testMoveChunkWithLoad(collName, nThreads) {
var coll = _db[collName];
initColl(coll);
initChunks(coll.getFullName(), numChunks - 1);
quiesceSystem();
sleep(500);
var nLogThreads = nThreads * 2;
updateVerbosityLevels();
jsTest.log("Begin test");
var benchRunTasks;
if (nThreads > 0) {
print("Starting load... ");
benchRunTasks = startLoad(coll.getFullName(), nThreads);
// Let load start before starting move.
sleep(2000);
}
var nMoves = 5;
for (var i = 1; i <= nMoves; i++) {
toShard = shardIds[i % numShards];
// Let the system settle after the moveChunk, logging the background throughput.
sleep(6000);
// Perform the actual moveChunk, recording background throughput before and after.
print("Starting moveChunk... ");
var moveChunkTime = moveChunkTo(coll.getFullName(), toShard);
print("Ending moveChunk... ");
jsTest.log("moveChunk to shard " + toShard + " takes " + moveChunkTime +
" ms with secondaryThrottle=" + tojson(secondaryThrottle) +
" nThreads=" + nLogThreads);
}
// End load.
print("Ending Chunk Migrations... ");
if (nThreads > 0) {
sleep(6000);
print("Ending benchRun... ");
benchRunTasks.forEach(function(benchRunTask) {
benchFinish(benchRunTask);
});
}
// This log is used by the result parser to distinguish tests. JSON.stringify() is used over
// tojson() because a single line JSON is easier for python to parse.
var testEndTime = new Date();
var testData = {
"numChunks": numChunks,
"secondaryThrottle": tojson(secondaryThrottle),
"nThreads": nLogThreads,
"time": testEndTime.toISOString()
};
jsTest.log("Test moveChunk with data: \t" + JSON.stringify(testData));
coll.drop();
}
function increaseMigrationWaitTimeoutAndOrphanCleanupDelay() {
var configDB = db.getSiblingDB("config");
var shards = configDB.shards.find().sort({_id: 1}).toArray();
for (var i = 0; i < shards.length; i++) {
var shard = shards[i];
// Connect to each shard check binary version and increase the timeout.
if (authEnabled){
// Slice off the leading rs*/ from the list of members in the replica set
shardHostParse = shard.host.slice(4);
shardHostAuth = 'mongodb://'.concat(username, ':', password, '@', shardHostParse);
} else {
shardHostAuth = shard.host;
}
var mongo = new Mongo(shardHostAuth);
var shardDB = mongo.getDB("admin");
var buildInfo = shardDB.adminCommand({buildInfo: 1});
if (buildInfo.versionArray[0] >= 5 || (buildInfo.versionArray[0] === 4 && buildInfo.versionArray[1] === 4)) {
shardDB.adminCommand({setParameter: 1, receiveChunkWaitForRangeDeleterTimeoutMS: 3600000});
}
if (buildInfo.versionArray[0] >= 9 || (buildInfo.versionArray[0] === 8 && buildInfo.versionArray[1] >= 1)) {
shardDB.adminCommand({setParameter: 1, orphanCleanupDelaySecs: 900});
}
}
}
function runTests() {
assert.commandWorked(_db.adminCommand({enableSharding: _db.getName()}));
assert.commandWorked(_db.adminCommand({movePrimary: _db.getName(), to: shardIds[0]}));
threadLevels.forEach(function(nThreads) {
var nThreadsNormalized = Math.floor(nThreads / 2);
testMoveChunkWithLoad("move_chunk_threads_" + nThreads, nThreadsNormalized);
});
}
increaseMigrationWaitTimeoutAndOrphanCleanupDelay();
runTests();
})();