/**
* @file
* This file tests the time it takes for mongos to route reads with a varying sized catalog.
*
* ### *Test*
*
* Compare the latency of a fixed workload for various catalog sizes.
*
* The workload consists of creating a varying number of sharded collections. The insertion of the
* collection and chunk entries happens directly into config.collections for performance reasons.
* ({@link https://docs.mongodb.com/manual/reference/method/db.collection.findOne/|findOne}).
*
* The scenarios tested are:
* 1. **fresh mongos**: read through a mongos that has not yet seen any data.
* 1. **steady mongos**: read from a collection through a mongos that has already seen this
* collection.
* 1. **new collection**: read from a collection through a mongos that has not seen this collection,
* but that has already seen all other collections in this db.
*
* Results are reported as the time in ms it takes to read in each scenario.
*
* ### *Setup*
* Requires a sharded cluster with two mongoses.
*
* ### *Notes*
* * This test uses a different database each iteration.
* * This test inserts a varying number of entries into config.collections and config.chunks at the
* start of each iteration.
* * This test uses a different database each iteration. The overall amount of data on the config
* should not affect the time to read, only the amount for a particular database.
*
* ### *Owning-team*
* mongodb/sharding
*
* @module workloads/mongos_large_catalog_cache
*/
/*global
db sharded_cluster Random enableSharding shardCollection quiesceSystem
benchRun benchStart benchFinish sh print printjson assert tojson
reportThroughput sleep server jsTest version findPrimaryShard ScopedThread
*/
/**
* The number of collections to "create" (insert directly into the catalog) before doing reads.
*/
var numCollectionsToCreate = numCollectionsToCreate || [0, 1000, 10000, 100000, 1000000];
/**
* The mongos hosts assigned IP addresses. The expected format is a list of dictionaries containing
* a "private_ip" field.
* This parameter is used to connect through multiple mongoses in a sharded cluster.
* If left unassigned, the workload and listeners will connect to the default mongos.
* This test only runs on sharded clusters.
*/
var mongos_hosts = mongos_hosts || ["localhost:27017"];
/**
* The major version of the server required to infer the FCV in use.
*/
var majorVersion = db.version().split(".")[0];
/**
* Create an entry to be inserted in both config.collections and config.chunks for a given
* collection with one chunk. This mimics the catalog entries for a sharded collection.
*/
var createCollectionAndChunkEntries = function(bulkCollections, bulkChunks, dbName, shard_ids, i, chunksByNS) {
var collName = "coll_" + i;
var lastmodEpoch = ObjectId();
var date = new Date();
var uuid = UUID();
var collectionEntry = {
"_id": dbName + "." + collName,
"lastmodEpoch": lastmodEpoch,
"lastmod": date,
"dropped": false,
"key": {"_id": 1},
"unique": false,
"uuid": uuid
};
bulkCollections.insert(collectionEntry);
var chunkEntry = chunksByNS ? {
"_id": dbName + "." + collName + "-_id_MinKey",
"ns": dbName + "." + collName,
"min": {"_id": MinKey},
"max": {"_id": MaxKey},
"shard": shard_ids[0],
"lastmod": date,
"lastmodEpoch": lastmodEpoch,
"history": [{"validAfter": Timestamp(0, 0), "shard": shard_ids[0]}]
} : {
"_id": dbName + "." + collName + "-_id_MinKey",
"uuid": uuid,
"min": {"_id": MinKey},
"max": {"_id": MaxKey},
"shard": shard_ids[0],
"lastmod": date,
"lastmodEpoch": lastmodEpoch,
"history": [{"validAfter": Timestamp(0, 0), "shard": shard_ids[0]}]
};
bulkChunks.insert(chunkEntry);
};
/**
* Insert nCollections entires directly in config.collections and config.chunks. This mimics the
* entries created during shardCollection without actually calling it nCollections times, which
* would take too long when nCollections is very large. Do this over nThreads to speed up the time
* it takes to insert.
*/
var createCollections = function(mongos, nCollections, dbName, username, password) {
var nThreads = 32;
var nCollectionsPerThread = Math.floor(nCollections / nThreads);
var numThreads = Array(nThreads).fill().map(function(_, i) { return i; });
if (authEnabled){
var mongosAuth = 'mongodb://'.concat(username, ':', password, '@', mongos);
} else {
mongosAuth = mongos;
}
// Insert nCollections using nThreads. If nCollections cannot be evenly divided
// by nThreads, we insert the leftover collections on the last thread below.
var threads = numThreads.map(function(threadNum) {
function createCollectionsPerThread(mongos,
nCollections,
threadNum,
dbName,
nCollectionsPerThread,
nThreads,
chunksByNS,
createCollectionAndChunkEntries,
username,
password,
mongosAuth) {
var config = new Mongo(mongosAuth).getDB("config");
var shards = config.shards.find().toArray();
var shard_ids = [];
shards.forEach(function(shard) {
shard_ids.push(shard._id);
});
var bulkCollections = config.collections.initializeUnorderedBulkOp();
var bulkChunks = config.chunks.initializeUnorderedBulkOp();
var min = threadNum * nCollectionsPerThread;
var max = nCollections === 0 ? 0 : ((threadNum + 1) * nCollectionsPerThread);
for (var i = min; i < max; i++) {
createCollectionAndChunkEntries(bulkCollections, bulkChunks, dbName, shard_ids, i, chunksByNS);
}
// Insert the remaining collections on the last threads.
if (threadNum === (nThreads - 1)) {
for (var i = max; i < nCollections; i++) {
createCollectionAndChunkEntries(
bulkCollections, bulkChunks, dbName, shard_ids, i, chunksByNS);
}
}
assert.writeOK(bulkCollections.execute());
assert.writeOK(bulkChunks.execute());
return;
}
var thread = new ScopedThread(createCollectionsPerThread,
mongos,
nCollections,
threadNum,
dbName,
nCollectionsPerThread,
nThreads,
majorVersion < 5,
createCollectionAndChunkEntries,
username,
password,
mongosAuth);
thread.start();
return thread;
});
threads.forEach(function(thread) {
thread.join();
});
// Immediately run garbage collection which cleans up new connections and implicit sessions
// created by the threads. If the cleanup happens at a later time it can stall a later command
// from running leading to incorrect performance execution timing. SEE: BF-29997.
gc();
assert.eq(nCollections,
new Mongo(mongosAuth)
.getDB("config")
.collections.find({"_id": {"$regex": dbName + ".coll_.*"}})
.count());
};
/**
* Read from a collection on a given shard so that the shard has the collection entry in its
* cache. This way, when we read from mongos the shard will not have to refresh as well.
*/
var readOnShards = function(shards, coll_name, dbName) {
shards.forEach(function(shard) {
if (authEnabled){
// Slice off the leading rs*/ from the list of members in the replica set
shardHostParse = shard.host.slice(4);
shardHostAuth = 'mongodb://'.concat(username, ':', password, '@', shardHostParse);
} else {
shardHostAuth = shard.host;
}
var shardConn = new Mongo(shardHostAuth);
shardConn.getDB(dbName).getCollection(coll_name).findOne();
});
};
var runTest = function() {
if (!sharded_cluster()) {
print("Workload 'mongos_read_single' must be run in a sharded environment.");
return;
}
var mongos_targets = mongos_hosts.map(function(x) { return x.private_ip; });
// Set up mongos connections
assert.gte(mongos_targets.length, 2);
var writingMongos = mongos_targets[0];
var readingMongos = mongos_targets[1];
var usernameFunc = username;
var passwordFunc = password;
if (authEnabled){
// Slice off the leading rs*/ from the list of members in the replica set
var writingMongosAuth = 'mongodb://'.concat(username, ':', password, '@', writingMongos);
var readingMongosAuth = 'mongodb://'.concat(username, ':', password, '@', readingMongos);
} else {
var writingMongosAuth = writingMongos;
var readingMongosAuth = readingMongos;
}
var writingMongosConn = new Mongo(writingMongosAuth);
var readingMongosConn = new Mongo(readingMongosAuth);
var config = writingMongosConn.getDB("config");
var shards = config.shards.find().toArray();
// Get latencies for reads through mongos for varying catalog sizes. Repeat 5 times for each
// catalog size to get the average latency.
numCollectionsToCreate.forEach(function(nCollections) {
print("Running with collections: " + nCollections);
var diff1_values = [];
var diff2_values = [];
var diff3_values = [];
var diff4_values = [];
for (var iteration = 0; iteration < 5; iteration++) {
var largeDbName = "huge_sharding_catalog_db_" + nCollections + "_iter_" + iteration;
var largeDb = writingMongosConn.getDB(largeDbName);
var test_ns = largeDbName + '.test';
var test_new_ns = largeDbName + '.test_new';
// Force mongos to clear their cache.
writingMongosConn.getDB("admin").runCommand({flushRouterConfig: 1});
readingMongosConn.getDB("admin").runCommand({flushRouterConfig: 1});
// Shard and insert into "test" collection.
assert.commandWorked(writingMongosConn.adminCommand({enableSharding: largeDbName}));
assert.commandWorked(
writingMongosConn.adminCommand({shardCollection: test_ns, key: {"_id": 1}}));
assert.commandWorked(writingMongosConn.getDB(largeDbName).test.insert({"x": 1}));
// Insert nCollections into the catalog.
createCollections(writingMongos, nCollections, largeDbName, usernameFunc, passwordFunc);
// Make sure shard is aware of "test" collection that we will read from mongos later.
readOnShards(shards, "test", largeDbName);
quiesceSystem();
// Time how long it takes for a fresh mongos to read in all the collections in the
// catalog when doing a find on a collection it has never seen before.
var t0 = new Date().getTime();
var fresh_mongos = readingMongosConn.getDB(largeDbName).test.findOne({"x": 1});
var t1 = new Date().getTime();
diff1_values.push(t1 - t0);
// Run the same find as above to time how long it takes for mongos to read from a
// collection it has seen before - it should not have to do a refresh because it should
// have already rereshed its cache.
t0 = new Date().getTime();
var steady_mongos = readingMongosConn.getDB(largeDbName).test.findOne({"x": 1});
t1 = new Date().getTime();
diff2_values.push(t1 - t0);
// Shard and insert into a new collection "test_new" in the same database.
assert.commandWorked(
writingMongosConn.adminCommand({shardCollection: test_new_ns, key: {"_id": 1}}));
assert.commandWorked(writingMongosConn.getDB(largeDbName).test_new.insert({"x": 1}));
// Make sure shard is aware of "test_new" collection that we will read from mongos
// later.
readOnShards(shards, "test_new", largeDbName);
quiesceSystem();
// Time how long it takes for a mongos to read a collection it has not seen before, that
// is in a database that this mongos has already seen. It should only have to load the
// one collection, because it already loaded all of the others.
t0 = new Date().getTime();
var new_coll_mongos = readingMongosConn.getDB(largeDbName).test_new.findOne({"x": 1});
t1 = new Date().getTime();
diff3_values.push(t1 - t0);
// Run the same find as above. This should have the same latency as diff2, because in
// both cases we are reading from a collection we've already seen and have already
// cached catalog information.
t0 = new Date().getTime();
var new_coll_steady_mongos =
readingMongosConn.getDB(largeDbName).test_new.findOne({"x": 1});
t1 = new Date().getTime();
diff4_values.push(t1 - t0);
// Negate the diff values because currently sys-perf sees higher values as better. In
// this case, we are reporting latencies, so lower is better when not negated.
reportThroughput("mongos_large_catalog_" + nCollections + "_colls_fresh_mongos",
-diff1_values[iteration],
{nThread: 1});
reportThroughput("mongos_large_catalog_" + nCollections + "_colls_steady_mongos",
-diff2_values[iteration],
{nThread: 1});
reportThroughput(
"mongos_large_catalog_" + nCollections + "_colls_new_coll_same_db_mongos",
-diff3_values[iteration],
{nThread: 1});
reportThroughput(
"mongos_large_catalog_" + nCollections + "_colls_new_coll_same_db__steady_mongos",
-diff4_values[iteration],
{nThread: 1});
}
// Print (max - min) / median for the measured latencies for each of the different reads.
var percent_range1 = (Math.max.apply(Math, diff1_values) - Math.min.apply(Math, diff1_values)) /
(diff1_values.sort())[Math.floor(diff1_values.length / 2)] * 100;
var percent_range2 = (Math.max.apply(Math, diff2_values) - Math.min.apply(Math, diff2_values)) /
(diff2_values.sort())[Math.floor(diff2_values.length / 2)] * 100;
var percent_range3 = (Math.max.apply(Math, diff3_values) - Math.min.apply(Math, diff3_values)) /
(diff3_values.sort())[Math.floor(diff3_values.length / 2)] * 100;
var percent_range4 = (Math.max.apply(Math, diff4_values) - Math.min.apply(Math, diff4_values)) /
(diff4_values.sort())[Math.floor(diff4_values.length / 2)] * 100;
print("range for fresh mongos with " + nCollections + " collections: " + percent_range1);
print("range for steady mongos with " + nCollections + " collections: " + percent_range2);
print("range for new collection same database with " + nCollections + " collections: " +
percent_range3);
print("range for new collection same database steady with " + nCollections +
" collections: " + percent_range4);
});
};
runTest();