【问题标题】:MongoDB skip stages on pipeline?MongoDB跳过管道上的阶段?
【发布时间】:2020-09-16 20:29:05
【问题描述】:

我想知道是否有任何方法可以跳过聚合管道上的阶段,更具体地说,如果 $lookup 阶段之一找到匹配项,则停止并返回。

我需要一个查询来从其他类型和/或组中检索“继承的”数据。在本例中,我有三个不同的表:devices_propertiestypes_propertiesgroup_properties,其中存储了每个设备、类型或组的属性。

如果设备定义了属性,即geofences,则可以直接从devices_properties读取,如果没有,则需要检查它的type和/或它的group看是否在那里定义。如果在其类型上找到,则无需签入组。

我有一个查询,它通过检查其类型/组,并对不同的表执行 $lookup 来工作。然后,通过一个开关,它返回相应的文档。然而,这并不是最优的,因为很多时候该属性将位于第一张桌子上:devices_properties。在这种情况下,它会进行 3 次不必要的查找,因为不需要检查设备类型和组,也不需要检查它们各自的属性。不确定我解释的是否正确。

我知道的查询如下。有什么办法可以优化吗?即,如果有匹配项,则在第一个 $lookup 之后停止?

db.devices.aggregate([
    {"$match" : { "_id": "alvarolb@esp32"}},
    {"$project" : {
        "_id": false,
        "asset_group": {"$concat" : ["alvarolb", "@", "$asset_group", ":", "geofences"]},
        "asset_type": {"$concat" : ["alvarolb", "@", "$asset_type", ":", "geofences"]}
     }},
     {"$lookup" : {
        "from": "devices_properties",
        "pipeline": [ 
            {"$match" : {"_id": "alvarolb@esp32:geofences"}},
        ],
        "as": "device"
    }},
    { "$unwind": {
        "path": "$device",
        "preserveNullAndEmptyArrays": true
    }},
    {"$lookup" : {
        "from": "groups_properties",
        "let" : {"asset_group" : "$asset_group"},
        "pipeline": [ 
            {"$match" : {"$expr" : { "$eq" : ["$_id", "$$asset_group"]}}}
        ],
        "as": "group"
    }},
    { "$unwind": {
        "path": "$group",
        "preserveNullAndEmptyArrays": true
    }},
    {"$lookup" : {
        "from": "types_properties",
        "let" : {"asset_type" : "$asset_type"},
        "pipeline": [ 
            {"$match" : {"$expr" : { "$eq" : ["$_id", "$$asset_type"]}}}
        ],
        "as": "type"
    }},
    { "$unwind": {
        "path": "$type",
        "preserveNullAndEmptyArrays": true
    }},
    {"$project" : {
        "value": {
            "$switch" : {
                "branches" : [
                    {"case": "$device", "then" : "$device"},
                    {"case": "$type", "then" : "$type"},
                    {"case": "$group", "then" : "$group"}
                ],
                "default": {}
            }
        }
    }},
    {"$replaceRoot": { "newRoot": "$value"}}
]);

谢谢!

【问题讨论】:

    标签: mongodb mongodb-query aggregation-framework


    【解决方案1】:

    我怀疑这个特定的查询需要优化,但聚合管道中的条件阶段通常是一个有趣的问题。

    首先,在第一阶段,您最多可以按索引字段选择一个文档,这已经是非常理想的了。你所有的查找都是一样的,所以我们谈论的是整个管道的几十毫秒的量级,即使是在大型集合上也是如此。值得优化吗?

    对于更一般的情况,当查找确实很昂贵时,您可以结合使用 $facet 来运行条件管道和 $concatArrays 来合并结果。

    第一次查找保持原样:

    db.devices.aggregate([
         ....
         {"$lookup" : {
            "from": "devices_properties",
            "pipeline": [ 
                {"$match" : {"_id": "alvarolb@esp32:geofences"}},
            ],
            "as": "device"
        }},
    

    然后我们添加一个指示器是否返回任何结果,这样我们就不需要更多的查找了:

    {$addFields:{found: {$size: "$device"}}},
    

    然后我们在 facet 中定义了 2 个管道:一个有下一个查找,另一个没有。要运行的开关是每个管道中的第一个 $match 阶段:

    {$facet:{
        yes:[
            {$match: {"$expr" : {$gt:["$found", 0]}}},
        ],
        no:[
            {$match: {"$expr" : {$eq:["$found", 0]}}},
            {"$lookup" : {
                "from": "groups_properties",
                "let" : {"asset_group" : "$asset_group"},
                "pipeline": [ 
                    {"$match" : {"$expr" : { "$eq" : ["$_id", "$$asset_group"]}}}
                ],
                "as": "group"
            }}
        ]
    }},
    

    在这个阶段之后,我们有 2 个数组“yes”和“no”,其中一个总是空的。合并两者并转换为*文档:

    {$addFields: {yesno: {$concatArrays:["$yes", "$no"]}}},
    {$unwind: "$yesno"},
    {"$replaceRoot": { "newRoot": "$yesno"}},
    

    如果到目前为止我们发现了什么,则重新计算指标:

    {$addFields:{found: {$add: [ "$found", {$size: {$ifNull:["$group", []]}}]}}},
    

    并为下一次查找重复相同的技术:

    $facet with $lookup in `groups_properties`  
    $addFields with $concatArrays 
    $unwind
    $replaceRoot
    

    那么您是否以类似的方式types_properties 并最终确定它的投影/替换根,就像在原始管道中一样。

    【讨论】:

    • 嗨,Alex,非常感谢您的洞察力。我认为不值得优化这个查询。无论如何,我喜欢你建议的条件管道方法:)
    • 虽然它不适合我的用例。但我喜欢创意。