【问题标题】:Aggregate and update MongoDB聚合和更新 MongoDB
【发布时间】:2019-05-10 15:19:39
【问题描述】:

我有 2 个收藏:

  • 客户(6 000 000 份文件)
  • 订单(50 000 000 份文件)

每天一次,我想按客户计算过去一年、过去一个月和过去一周等的订单数。

我试过这个:

db.orders.aggregate(
    {$match: 
        { date_order: { $gt: v_date1year } }
    },
    {$group : {
        _id : "$id_client", 
        count : {$sum : 1}
    }} ,
    {
        "$out": "tmp_indicators"
    }
)

db.tmp_indicators.find({}).forEach(function (my_client) { 
    db.clients.update (
        {"id_client": my_client._id},
        {"$set": 
            { "nb_orders_1year" : my_client.count }
        }
    )
})

我必须这样做 3 次,1 次用于过去一年的聚合,1 次用于过去一个月,1 次用于过去一周。 治疗很慢,您知道如何以更好的方式执行吗?

【问题讨论】:

    标签: mongodb mongodb-query aggregation-framework


    【解决方案1】:

    为了提高性能,尤其是在处理大型集合时,请利用 Bulk() API 进行批量更新,因为您将批量发送操作到服务器(例如,说批量大小为 1000),这可以为您提供更好的性能,因为您不会将每个请求都发送到服务器(就像您当前在 forEach() 循环中使用更新语句所做的那样),但只发送一次在每 1000 个请求中,从而使您的更新比当前更有效和更快。

    以下示例演示了这种方法,第一个示例使用 MongoDB 版本 >= 2.6 and < 3.2 中提供的 Bulk() API。它通过使用聚合结果中的值更改 nb_orders_1year 字段来更新 clients 集合中的所有文档。

    由于aggregate()方法返回一个cursor你可以使用聚合输出集合的forEach()方法对其进行迭代并访问每个文档,从而批量设置批量更新操作,然后通过 API 高效地发送到服务器:

    var bulk = db.clients.initializeUnorderedBulkOp(),
        pipeline = [
            {
                "$match": { "date_order": { "$gt": v_date1year } }
            },
            {
                "$group": {
                    "_id": "$id_client", 
                    "count": { "$sum" : 1 }
                }
            },
            { "$out": "tmp_indicators" }        
        ],
        counter = 0;
    
    db.orders.aggregate(pipeline);  
    db.tmp_indicators.find().forEach(function (doc) {       
        bulk.find({ "_id": doc._id }).updateOne({ 
            "$set": { "nb_orders_1year": doc.count }
        });
    
        counter++;
        if (counter % 1000 == 0) {
            bulk.execute(); // Execute per 1000 operations and re-initialize every 1000 update statements
            bulk = db.clients.initializeUnorderedBulkOp();
        }
    });
    // Clean up remaining operations in queue
    if (counter % 1000 != 0) { bulk.execute(); }
    

    下一个示例适用于新的 MongoDB 版本 3.2,该版本自 deprecated the Bulk API 起使用并使用 bulkWrite() 提供了一组更新的 api。

    它使用与上面相同的游标,但不是迭代结果,而是使用其 map() 方法创建具有批量操作的数组:

     var pipeline = [
            {
                "$match": { "date_order": { "$gt": v_date1year } }
            },
            {
                "$group": {
                    "_id": "$id_client", 
                    "count": { "$sum" : 1 }
                }
            },
            { "$out": "tmp_indicators" }        
        ];
    db.orders.aggregate(pipeline);
    var bulkOps = db.tmp_indicators.find().map(function (doc) { 
            return { 
                "updateOne": { 
                    "filter": { "_id": doc._id } ,              
                    "update": { "$set": { "nb_orders_1year": doc.count } } 
                }         
            };
        });
    
    db.clients.bulkWrite(bulkOps, { "ordered": true });
    

    【讨论】:

    • 感谢您的解决方案,但由于文档的大小限制,使用光标不起作用。我有这个问题“异常:聚合结果超过最大文档大小(16MB)”。这就是我使用“$out”实用程序的原因
    • @Mouette 你是对的,我在最初的回复中没有考虑到这一点,但后来我更新了我的答案以使用聚合输出集合的光标。
    • 如果是第一个解决方案,每 1000 次操作执行一次批量的目的是什么?批量实用程序不能一次处理 6 000 000 次更新(每个客户端最多 1 次)吗?
    • 有了如此庞大的批量大小,您肯定会受到 16BM BSON 限制约束,因此选择较小的可管理批量大小。默认值为 1000(最大值)。
    猜你喜欢
    • 2021-11-04
    • 1970-01-01
    • 2015-03-13
    • 1970-01-01
    • 2018-06-14
    • 1970-01-01
    • 2022-12-02
    • 2020-04-25
    • 2023-03-21
    相关资源
    最近更新 更多