【问题标题】:Mongodb aggregation $group, restrict length of arrayMongodb聚合$group,限制数组长度
【发布时间】:2014-08-19 06:20:22
【问题描述】:

我想根据一个字段对所有文档进行分组,但要限制为每个值分组的文档数量。

每条消息都有一个conversation_ID。我需要为每个会话 ID 获取 10 条或更少的消息。

我可以根据以下命令进行分组,但不知道如何限制 除了对结果进行切片之外,分组文档的数量 Message.aggregate({'$group':{_id:'$conversation_ID',msgs:{'$push':{msgid:'$_id'}}}})

如何限制每个conversation_ID的msgs数组长度为10?

【问题讨论】:

  • 比您想象的要复杂得多。最重要的是SERVER-6074 和类似的问题。聚合框架不支持$slice 或具有“限制”推送项目的操作。但这是可能的,只是太可怕了。
  • @NeilLunn 经过一番谷歌搜索后偶然发现了SERVER-6074。那么目前手动切片是唯一的选择吗?
  • 不是 only 选项。正如我所说,这是个大问题,所以需要花一点时间来解释这个过程。不过,让其他运营商来做这件事会很好。为 JIRA 问题投票

标签: mongodb mongoose mongodb-query aggregation-framework database


【解决方案1】:

现代

从 MongoDB 3.6 开始,有一种“新颖”的方法,即使用 $lookup 执行“自连接”,其方式与下面演示的原始游标处理方式大致相同。

由于在此版本中,您可以将$lookup"pipeline" 参数指定为“加入”的来源,这实质上意味着您可以使用$match$limit 来收集和“限制”数组:

db.messages.aggregate([
  { "$group": { "_id": "$conversation_ID" } },
  { "$lookup": {
    "from": "messages",
    "let": { "conversation": "$_id" },
    "pipeline": [
      { "$match": { "$expr": { "$eq": [ "$conversation_ID", "$$conversation" ] } }},
      { "$limit": 10 },
      { "$project": { "_id": 1 } }
    ],
    "as": "msgs"
  }}
])

您可以选择在$lookup 之后添加额外的投影,以使数组项只是值而不是带有_id 键的文档,但只需执行上述操作即可获得基本结果。

仍然有出色的SERVER-9277 直接请求“限制推送”,但在此期间以这种方式使用$lookup 是一个可行的替代方案。

注意:还有$slice,是在写完原答案后介绍的,原内容中“突出JIRA问题”提到。虽然您可以使用较小的结果集获得相同的结果,但它仍然涉及“将所有内容”“推入”数组,然后将最终数组输出限制为所需的长度。

这就是主要区别,也是为什么 $slice 对于大结果通常不实用的原因。但当然可以在它的情况下交替使用。

mongodb group values by multiple fields 上还有更多关于这两种用法的详细信息。


原创

如前所述,这并非不可能,但肯定是一个可怕的问题。

实际上,如果您主要担心生成的数组会非常大,那么您最好的方法是将每个不同的“conversation_ID”作为单独的查询提交,然后合并您的结果。在非常 MongoDB 2.6 的语法中,可能需要根据您的语言实现的实际情况进行一些调整:

var results = [];
db.messages.aggregate([
    { "$group": {
        "_id": "$conversation_ID"
    }}
]).forEach(function(doc) {
    db.messages.aggregate([
        { "$match": { "conversation_ID": doc._id } },
        { "$limit": 10 },
        { "$group": {
            "_id": "$conversation_ID",
            "msgs": { "$push": "$_id" }
        }}
    ]).forEach(function(res) {
        results.push( res );
    });
});

但这一切都取决于您是否要避免这种情况。等到真正的答案:


这里的第一个问题是没有函数可以“限制”“推入”数组的项目数量。这当然是我们想要的,但该功能目前不存在。

第二个问题是,即使将所有项目推入数组,也不能在聚合管道中使用$slice 或任何类似的运算符。因此,目前没有办法通过简单的操作从生成的数组中获取“前 10”个结果。

但您实际上可以生成一组操作来有效地在分组边界上“切片”。它相当复杂,例如在这里我将把数组元素“切片”减少到“六个”。这里的主要原因是演示该过程并展示如何在不破坏不包含您要“切片”到的总数的数组的情况下执行此操作。

给定一个文档样本:

{ "_id" : 1, "conversation_ID" : 123 }
{ "_id" : 2, "conversation_ID" : 123 }
{ "_id" : 3, "conversation_ID" : 123 }
{ "_id" : 4, "conversation_ID" : 123 }
{ "_id" : 5, "conversation_ID" : 123 }
{ "_id" : 6, "conversation_ID" : 123 }
{ "_id" : 7, "conversation_ID" : 123 }
{ "_id" : 8, "conversation_ID" : 123 }
{ "_id" : 9, "conversation_ID" : 123 }
{ "_id" : 10, "conversation_ID" : 123 }
{ "_id" : 11, "conversation_ID" : 123 }
{ "_id" : 12, "conversation_ID" : 456 }
{ "_id" : 13, "conversation_ID" : 456 }
{ "_id" : 14, "conversation_ID" : 456 }
{ "_id" : 15, "conversation_ID" : 456 }
{ "_id" : 16, "conversation_ID" : 456 }

您可以在那里看到,当按您的条件分组时,您将获得一个包含十个元素的数组和另一个包含“五个”元素的数组。您在这里想要做的事情将两者都减少到前“六个”,而不会“破坏”只匹配“五个”元素的数组。

还有以下查询:

db.messages.aggregate([
    { "$group": {
        "_id": "$conversation_ID",
        "first": { "$first": "$_id" },
        "msgs": { "$push": "$_id" },
    }},
    { "$unwind": "$msgs" },
    { "$project": {
        "msgs": 1,
        "first": 1,
        "seen": { "$eq": [ "$first", "$msgs" ] }
    }},
    { "$sort": { "seen": 1 }},
    { "$group": {
        "_id": "$_id",
        "msgs": { 
            "$push": {
               "$cond": [ { "$not": "$seen" }, "$msgs", false ]
            }
        },
        "first": { "$first": "$first" },
        "second": { "$first": "$msgs" }
    }},
    { "$unwind": "$msgs" },
    { "$project": {
        "msgs": 1,
        "first": 1,
        "second": 1,
        "seen": { "$eq": [ "$second", "$msgs" ] }
    }},
    { "$sort": { "seen": 1 }},
    { "$group": {
        "_id": "$_id",
        "msgs": { 
            "$push": {
               "$cond": [ { "$not": "$seen" }, "$msgs", false ]
            }
        },
        "first": { "$first": "$first" },
        "second": { "$first": "$second" },
        "third": { "$first": "$msgs" }
    }},
    { "$unwind": "$msgs" },
    { "$project": {
        "msgs": 1,
        "first": 1,
        "second": 1,
        "third": 1,
        "seen": { "$eq": [ "$third", "$msgs" ] },
    }},
    { "$sort": { "seen": 1 }},
    { "$group": {
        "_id": "$_id",
        "msgs": { 
            "$push": {
               "$cond": [ { "$not": "$seen" }, "$msgs", false ]
            }
        },
        "first": { "$first": "$first" },
        "second": { "$first": "$second" },
        "third": { "$first": "$third" },
        "forth": { "$first": "$msgs" }
    }},
    { "$unwind": "$msgs" },
    { "$project": {
        "msgs": 1,
        "first": 1,
        "second": 1,
        "third": 1,
        "forth": 1,
        "seen": { "$eq": [ "$forth", "$msgs" ] }
    }},
    { "$sort": { "seen": 1 }},
    { "$group": {
        "_id": "$_id",
        "msgs": { 
            "$push": {
               "$cond": [ { "$not": "$seen" }, "$msgs", false ]
            }
        },
        "first": { "$first": "$first" },
        "second": { "$first": "$second" },
        "third": { "$first": "$third" },
        "forth": { "$first": "$forth" },
        "fifth": { "$first": "$msgs" }
    }},
    { "$unwind": "$msgs" },
    { "$project": {
        "msgs": 1,
        "first": 1,
        "second": 1,
        "third": 1,
        "forth": 1,
        "fifth": 1,
        "seen": { "$eq": [ "$fifth", "$msgs" ] }
    }},
    { "$sort": { "seen": 1 }},
    { "$group": {
        "_id": "$_id",
        "msgs": { 
            "$push": {
               "$cond": [ { "$not": "$seen" }, "$msgs", false ]
            }
        },
        "first": { "$first": "$first" },
        "second": { "$first": "$second" },
        "third": { "$first": "$third" },
        "forth": { "$first": "$forth" },
        "fifth": { "$first": "$fifth" },
        "sixth": { "$first": "$msgs" },
    }},
    { "$project": {
         "first": 1,
         "second": 1,
         "third": 1,
         "forth": 1,
         "fifth": 1,
         "sixth": 1,
         "pos": { "$const": [ 1,2,3,4,5,6 ] }
    }},
    { "$unwind": "$pos" },
    { "$group": {
        "_id": "$_id",
        "msgs": {
            "$push": {
                "$cond": [
                    { "$eq": [ "$pos", 1 ] },
                    "$first",
                    { "$cond": [
                        { "$eq": [ "$pos", 2 ] },
                        "$second",
                        { "$cond": [
                            { "$eq": [ "$pos", 3 ] },
                            "$third",
                            { "$cond": [
                                { "$eq": [ "$pos", 4 ] },
                                "$forth",
                                { "$cond": [
                                    { "$eq": [ "$pos", 5 ] },
                                    "$fifth",
                                    { "$cond": [
                                        { "$eq": [ "$pos", 6 ] },
                                        "$sixth",
                                        false
                                    ]}
                                ]}
                            ]}
                        ]}
                    ]}
                ]
            }
        }
    }},
    { "$unwind": "$msgs" },
    { "$match": { "msgs": { "$ne": false } }},
    { "$group": {
        "_id": "$_id",
        "msgs": { "$push": "$msgs" }
    }}
])

你会得到数组中最靠前的结果,最多六个条目:

{ "_id" : 123, "msgs" : [ 1, 2, 3, 4, 5, 6 ] }
{ "_id" : 456, "msgs" : [ 12, 13, 14, 15 ] }

正如您在此处看到的,非常有趣。

在您最初分组后,您基本上希望将$first 值“弹出”出堆栈以获取数组结果。为了简化这个过程,我们实际上是在初始操作中这样做的。于是流程变成了:

  • $unwind数组
  • $eq 相等匹配中已经看到的值进行比较
  • $sort 结果“浮动”false 未见过的值到顶部(这仍然保留顺序)
  • $group 再次返回并“弹出”$first 未见过的值作为堆栈中的下一个成员。这也使用$cond 运算符将数组堆栈中的“已见”值替换为false,以帮助进行评估。

$cond 的最后一项操作是确保未来的迭代不仅仅是在“切片”计数大于数组成员的情况下一遍又一遍地添加数组的最后一个值。

整个过程需要针对您希望“切片”的任意数量的项目重复。由于我们已经在初始分组中找到了“第一个”项,这意味着需要 n-1 迭代以获得所需的切片结果。

最后的步骤实际上只是将所有内容转换回数组以获得最终结果的可选说明。所以真的只是有条件地将项目或false 推回它们的匹配位置,最后“过滤”出所有false 值,因此最终数组分别具有“六个”和“五个”成员。

因此没有标准的运算符来适应这一点,您不能仅仅将推送“限制”为 5 或 10 或数组中的任何项目。但如果你真的必须这样做,那么这是你最好的方法。


您可以使用 mapReduce 来解决这个问题,并一起放弃聚合框架。我会采取的方法(在合理的范围内)是在服务器上有效地拥有一个内存中的哈希映射并将数组累积到那个位置,同时使用 JavaScript 切片来“限制”结果:

db.messages.mapReduce(
    function () {

        if ( !stash.hasOwnProperty(this.conversation_ID) ) {
            stash[this.conversation_ID] = [];
        }

        if ( stash[this.conversation_ID.length < maxLen ) {
            stash[this.conversation_ID].push( this._id );
            emit( this.conversation_ID, 1 );
        }

    },
    function(key,values) {
        return 1;   // really just want to keep the keys
    },
    { 
        "scope": { "stash": {}, "maxLen": 10 },
        "finalize": function(key,value) {
            return { "msgs": stash[key] };                
        },
        "out": { "inline": 1 }
    }
)

这样就基本上构建了与发出的“键”匹配的“内存中”对象,其中一个数组永远不会超过您希望从结果中获取的最大大小。此外,当达到最大堆栈时,这甚至不会费心“发射”项目。

reduce 部分实际上除了归约到“键”和单个值之外什么也没做。因此,以防万一我们的 reducer 没有被调用,如果一个键只存在 1 个值,那么 finalize 函数会负责将“存储”键映射到最终输出。

其效果因输出大小而异,JavaScript 求值肯定不会很快,但可能比在管道中处理大型数组要快。


投票给JIRA issues 以实际使用“切片”运算符,甚至对“$push”和“$addToSet”进行“限制”,这都很方便。个人希望至少可以对$map运算符进行一些修改,以便在处理时暴露“当前索引”值。这将有效地允许“切片”和其他操作。

确实,您可能希望对此进行编码以“生成”所有必需的迭代。如果这里的答案得到了足够的爱和/或其他时间等待我学习,那么我可能会添加一些代码来演示如何做到这一点。这已经是一个相当长的响应了。


生成管道的代码:

var key = "$conversation_ID";
var val = "$_id";
var maxLen = 10;

var stack = [];
var pipe = [];
var fproj = { "$project": { "pos": { "$const": []  } } };

for ( var x = 1; x <= maxLen; x++ ) {

    fproj["$project"][""+x] = 1;
    fproj["$project"]["pos"]["$const"].push( x );

    var rec = {
        "$cond": [ { "$eq": [ "$pos", x ] }, "$"+x ]
    };
    if ( stack.length == 0 ) {
        rec["$cond"].push( false );
    } else {
        lval = stack.pop();
        rec["$cond"].push( lval );
    }

    stack.push( rec );

    if ( x == 1) {
        pipe.push({ "$group": {
           "_id": key,
           "1": { "$first": val },
           "msgs": { "$push": val }
        }});
    } else {
        pipe.push({ "$unwind": "$msgs" });
        var proj = {
            "$project": {
                "msgs": 1
            }
        };
        
        proj["$project"]["seen"] = { "$eq": [ "$"+(x-1), "$msgs" ] };
       
        var grp = {
            "$group": {
                "_id": "$_id",
                "msgs": {
                    "$push": {
                        "$cond": [ { "$not": "$seen" }, "$msgs", false ]
                    }
                }
            }
        };

        for ( n=x; n >= 1; n-- ) {
            if ( n != x ) 
                proj["$project"][""+n] = 1;
            grp["$group"][""+n] = ( n == x ) ? { "$first": "$msgs" } : { "$first": "$"+n };
        }

        pipe.push( proj );
        pipe.push({ "$sort": { "seen": 1 } });
        pipe.push(grp);
    }
}

pipe.push(fproj);
pipe.push({ "$unwind": "$pos" });
pipe.push({
    "$group": {
        "_id": "$_id",
        "msgs": { "$push": stack[0] }
    }
});
pipe.push({ "$unwind": "$msgs" });
pipe.push({ "$match": { "msgs": { "$ne": false } }});
pipe.push({
    "$group": {
        "_id": "$_id",
        "msgs": { "$push": "$msgs" }
    }
}); 

这构建了直到maxLen 的基本迭代方法,步骤从$unwind$group。还嵌入了所需的最终预测和“嵌套”条件语句的详细信息。最后基本就是对这个问题采取的方法:

Does MongoDB's $in clause guarantee order?

【讨论】:

  • 确实很有趣。是的,我主要担心的是不必要地获取大量数据,这在处理大量数据时是一个严重的问题。我非常感谢您如何尝试解释有关此问题的每个细节。尽管我了解解决问题的基本方法,但我必须阅读文档才能理解所有内容。感觉更像是一种 hack :) 在解决确切的问题陈述的同时使其工作。我希望您有时间添加更多信息,因为我觉得这是一个普遍的用例,它可以帮助很多 mongo 开发人员。
  • 我在 nodejs 上使用猫鼬。我感觉像 10-20 这样更大的数字,可以编写一个函数来生成查询 JSON,但对于更大的数字,它可能会变得太复杂。
  • @ma08 绝对正确,这是运行这些结果的耻辱,您可以单独执行这些结果,也可以将其转储到集合中并使用 $slice 和 find。如果数组变大甚至可能根据数据打破每个文档的 BSON 限制,则后者不是非常理想的。所以整体解决方案是可行的,但有局限性。我认为大多数人都会同意我们真正需要的是添加到 push 的“限制”选项。这限制了数组结果的大小。我看看能不能在几个小时后在这里添加一些生成代码。
  • @ma08 添加了代码。还要早一点为此使用 mapReduce 方法。
  • 感谢您的意见!!。但是我觉得 mapReduce 几乎永远不会被使用(用于分组消息的代码)将用于发送对客户端请求的响应。有什么建议吗?出于某种原因,标记似乎对我不起作用
【解决方案2】:

Mongo 4.4 开始,$group 阶段有一个新的聚合运算符 $accumulator,允许通过 javascript 用户定义的函数在文档分组时自定义累积文档。

因此,为了只为每个对话选择 n 条消息(例如 2 条):

// { "conversationId" : 3, "messageId" : 14 }
// { "conversationId" : 5, "messageId" : 34 }
// { "conversationId" : 3, "messageId" : 39 }
// { "conversationId" : 3, "messageId" : 47 }
db.collection.aggregate([
  { $group: {
    _id: "$conversationId",
    messages: {
      $accumulator: {
        accumulateArgs: ["$messageId"],
        init: function() { return [] },
        accumulate:
          function(messages, message) { return messages.concat(message).slice(0, 2); },
        merge:
          function(messages1, messages2) { return messages1.concat(messages2).slice(0, 2); },
        lang: "js"
      }
    }
  }}
])
// { "_id" : 5, "messages" : [ 34 ] }
// { "_id" : 3, "messages" : [ 14, 39 ] }

累加器:

  • 在场上积累messageId (accumulateArgs)
  • 被初始化为一个空数组 (init)
  • 在一个数组中累积messageId项,最多只保留2个(accumulatemerge

【讨论】:

    【解决方案3】:

    Mongo 5.2release schedule开始,这是新的$topN聚合累加器的完美用例:

    // { "conversationId" : 3, "messageId" : 14 }
    // { "conversationId" : 5, "messageId" : 34 }
    // { "conversationId" : 3, "messageId" : 39 }
    // { "conversationId" : 3, "messageId" : 47 }
    db.collection.aggregate([
      { $group: {
        _id: "$conversationId",
        messages: { $topN: { n: 2, output: "$messageId", sortBy: { _id: 1 } } }
      }}
    ])
    // { "_id" : 5, "messages" : [ 34 ] }
    // { "_id" : 3, "messages" : [ 14, 39 ] }
    

    这应用了$topN 组累积:

    • 为每个组获取前 2 个 (n: 2) 元素
    • 并为每个分组记录提取字段value (output: "$messageId")
    • “前 2 名”的选择由 sortBy: { _id: 1 } 定义(我选择为 _id,因为您没有指定订单)。

    【讨论】:

      【解决方案4】:

      $slice 运算符不是聚合运算符,因此您不能 这样做(就像我在此答案中建议的那样,在编辑之前):

      db.messages.aggregate([
         { $group : {_id:'$conversation_ID',msgs: { $push: { msgid:'$_id' }}}},
         { $project : { _id : 1, msgs : { $slice : 10 }}}]);
      

      Neil 的回答非常详细,但您可以使用稍微不同的方法(如果它适合您的用例)。您可以汇总结果并将其输出到新集合中:

      db.messages.aggregate([
         { $group : {_id:'$conversation_ID',msgs: { $push: { msgid:'$_id' }}}},
         { $out : "msgs_agg" }
      ]);
      

      $out 运算符会将聚合结果写入新集合。然后,您可以使用 $slice 运算符使用常规查找查询项目您的结果:

      db.msgs_agg.find({}, { msgs : { $slice : 10 }});
      

      对于本测试文档:

      > db.messages.find().pretty();
      { "_id" : 1, "conversation_ID" : 123 }
      { "_id" : 2, "conversation_ID" : 123 }
      { "_id" : 3, "conversation_ID" : 123 }
      { "_id" : 4, "conversation_ID" : 123 }
      { "_id" : 5, "conversation_ID" : 123 }
      { "_id" : 7, "conversation_ID" : 1234 }
      { "_id" : 8, "conversation_ID" : 1234 }
      { "_id" : 9, "conversation_ID" : 1234 }
      

      结果将是:

      > db.msgs_agg.find({}, { msgs : { $slice : 10 }});
      { "_id" : 1234, "msgs" : [ { "msgid" : 7 }, { "msgid" : 8 }, { "msgid" : 9 } ] }
      { "_id" : 123, "msgs" : [ { "msgid" : 1 }, { "msgid" : 2 }, { "msgid" : 3 }, 
                                { "msgid" : 4 }, { "msgid" : 5 } ] }
      

      编辑

      我认为这意味着复制整个消息集合。 这不是矫枉过正吗?

      嗯,显然这种方法不适用于庞大的集合。但是,由于您正在考虑使用大型聚合管道或大型 map-reduce 作业,因此您可能不会将其用于“实时”请求。

      这种方法有很多缺点:16 MB BSON 限制,如果您要创建具有聚合的大型文档、重复浪费磁盘空间/内存、增加磁盘 IO...

      这种方法的优点:实施简单,因此易于更改。如果您的集合很少更新,您可以像缓存一样使用这个“out”集合。这样您就不必多次执行聚合操作,然后您甚至可以在“out”集合上支持“实时”客户端请求。要刷新数据,您可以定期进行聚合(例如,在每晚运行的后台作业中)。

      就像在 cmets 中所说的那样,这不是一个简单的问题,而且还没有一个完美的解决方案(还没有!)。我向您展示了另一种您可以使用的方法,由您进行基准测试并决定最适合您的用例的方法。

      【讨论】:

      • { [MongoError: exception: invalid operator '$slice'] name: 'MongoError', errmsg: 'exception: invalid operator \'$slice\'', code: 15999, ok: 0 }我正在为 nodejs 使用猫鼬。 $slice 似乎不适用于聚合。而且我想限制 mongoDB 在达到特定限制后停止分组,而不是对结果进行切片。有没有办法做到这一点?谢谢
      • 我认为这意味着复制整个消息集合。这不是矫枉过正吗?
      • @ma08 取决于您的要求。查看我的编辑。
      【解决方案5】:

      我希望这会如你所愿:

      db.messages.aggregate([
         { $group : {_id:'$conversation_ID',msgs: { $push: { msgid:'$_id' }}}},
         { $project : { _id : 1, msgs : { $slice : ["$msgid",0,10] }}}
      ]);
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2019-03-27
        • 1970-01-01
        • 2013-01-12
        • 1970-01-01
        • 2013-12-05
        • 2013-07-31
        • 1970-01-01
        相关资源
        最近更新 更多