为了提高性能,尤其是在处理大型集合时,请利用 Bulk() API 进行批量更新,因为您将批量发送操作到服务器(例如,说批量大小为 1000),这可以为您提供更好的性能,因为您不会将每个请求都发送到服务器(就像您当前在 forEach() 循环中使用更新语句所做的那样),但只发送一次在每 1000 个请求中,从而使您的更新比当前更有效和更快。
以下示例演示了这种方法,第一个示例使用 MongoDB 版本 >= 2.6 and < 3.2 中提供的 Bulk() API。它通过使用聚合结果中的值更改 nb_orders_1year 字段来更新 clients 集合中的所有文档。
由于aggregate()方法返回一个cursor,你可以使用聚合输出集合的forEach()方法对其进行迭代并访问每个文档,从而批量设置批量更新操作,然后通过 API 高效地发送到服务器:
var bulk = db.clients.initializeUnorderedBulkOp(),
pipeline = [
{
"$match": { "date_order": { "$gt": v_date1year } }
},
{
"$group": {
"_id": "$id_client",
"count": { "$sum" : 1 }
}
},
{ "$out": "tmp_indicators" }
],
counter = 0;
db.orders.aggregate(pipeline);
db.tmp_indicators.find().forEach(function (doc) {
bulk.find({ "_id": doc._id }).updateOne({
"$set": { "nb_orders_1year": doc.count }
});
counter++;
if (counter % 1000 == 0) {
bulk.execute(); // Execute per 1000 operations and re-initialize every 1000 update statements
bulk = db.clients.initializeUnorderedBulkOp();
}
});
// Clean up remaining operations in queue
if (counter % 1000 != 0) { bulk.execute(); }
下一个示例适用于新的 MongoDB 版本 3.2,该版本自 deprecated the Bulk API 起使用并使用 bulkWrite() 提供了一组更新的 api。
它使用与上面相同的游标,但不是迭代结果,而是使用其 map() 方法创建具有批量操作的数组:
var pipeline = [
{
"$match": { "date_order": { "$gt": v_date1year } }
},
{
"$group": {
"_id": "$id_client",
"count": { "$sum" : 1 }
}
},
{ "$out": "tmp_indicators" }
];
db.orders.aggregate(pipeline);
var bulkOps = db.tmp_indicators.find().map(function (doc) {
return {
"updateOne": {
"filter": { "_id": doc._id } ,
"update": { "$set": { "nb_orders_1year": doc.count } }
}
};
});
db.clients.bulkWrite(bulkOps, { "ordered": true });