现代
从 MongoDB 3.6 开始,有一种“新颖”的方法,即使用 $lookup 执行“自连接”,其方式与下面演示的原始游标处理方式大致相同。
由于在此版本中,您可以将$lookup 的"pipeline" 参数指定为“加入”的来源,这实质上意味着您可以使用$match 和$limit 来收集和“限制”数组:
db.messages.aggregate([
{ "$group": { "_id": "$conversation_ID" } },
{ "$lookup": {
"from": "messages",
"let": { "conversation": "$_id" },
"pipeline": [
{ "$match": { "$expr": { "$eq": [ "$conversation_ID", "$$conversation" ] } }},
{ "$limit": 10 },
{ "$project": { "_id": 1 } }
],
"as": "msgs"
}}
])
您可以选择在$lookup 之后添加额外的投影,以使数组项只是值而不是带有_id 键的文档,但只需执行上述操作即可获得基本结果。
仍然有出色的SERVER-9277 直接请求“限制推送”,但在此期间以这种方式使用$lookup 是一个可行的替代方案。
注意:还有$slice,是在写完原答案后介绍的,原内容中“突出JIRA问题”提到。虽然您可以使用较小的结果集获得相同的结果,但它仍然涉及“将所有内容”“推入”数组,然后将最终数组输出限制为所需的长度。
这就是主要区别,也是为什么 $slice 对于大结果通常不实用的原因。但当然可以在它的情况下交替使用。
mongodb group values by multiple fields 上还有更多关于这两种用法的详细信息。
原创
如前所述,这并非不可能,但肯定是一个可怕的问题。
实际上,如果您主要担心生成的数组会非常大,那么您最好的方法是将每个不同的“conversation_ID”作为单独的查询提交,然后合并您的结果。在非常 MongoDB 2.6 的语法中,可能需要根据您的语言实现的实际情况进行一些调整:
var results = [];
db.messages.aggregate([
{ "$group": {
"_id": "$conversation_ID"
}}
]).forEach(function(doc) {
db.messages.aggregate([
{ "$match": { "conversation_ID": doc._id } },
{ "$limit": 10 },
{ "$group": {
"_id": "$conversation_ID",
"msgs": { "$push": "$_id" }
}}
]).forEach(function(res) {
results.push( res );
});
});
但这一切都取决于您是否要避免这种情况。等到真正的答案:
这里的第一个问题是没有函数可以“限制”“推入”数组的项目数量。这当然是我们想要的,但该功能目前不存在。
第二个问题是,即使将所有项目推入数组,也不能在聚合管道中使用$slice 或任何类似的运算符。因此,目前没有办法通过简单的操作从生成的数组中获取“前 10”个结果。
但您实际上可以生成一组操作来有效地在分组边界上“切片”。它相当复杂,例如在这里我将把数组元素“切片”减少到“六个”。这里的主要原因是演示该过程并展示如何在不破坏不包含您要“切片”到的总数的数组的情况下执行此操作。
给定一个文档样本:
{ "_id" : 1, "conversation_ID" : 123 }
{ "_id" : 2, "conversation_ID" : 123 }
{ "_id" : 3, "conversation_ID" : 123 }
{ "_id" : 4, "conversation_ID" : 123 }
{ "_id" : 5, "conversation_ID" : 123 }
{ "_id" : 6, "conversation_ID" : 123 }
{ "_id" : 7, "conversation_ID" : 123 }
{ "_id" : 8, "conversation_ID" : 123 }
{ "_id" : 9, "conversation_ID" : 123 }
{ "_id" : 10, "conversation_ID" : 123 }
{ "_id" : 11, "conversation_ID" : 123 }
{ "_id" : 12, "conversation_ID" : 456 }
{ "_id" : 13, "conversation_ID" : 456 }
{ "_id" : 14, "conversation_ID" : 456 }
{ "_id" : 15, "conversation_ID" : 456 }
{ "_id" : 16, "conversation_ID" : 456 }
您可以在那里看到,当按您的条件分组时,您将获得一个包含十个元素的数组和另一个包含“五个”元素的数组。您在这里想要做的事情将两者都减少到前“六个”,而不会“破坏”只匹配“五个”元素的数组。
还有以下查询:
db.messages.aggregate([
{ "$group": {
"_id": "$conversation_ID",
"first": { "$first": "$_id" },
"msgs": { "$push": "$_id" },
}},
{ "$unwind": "$msgs" },
{ "$project": {
"msgs": 1,
"first": 1,
"seen": { "$eq": [ "$first", "$msgs" ] }
}},
{ "$sort": { "seen": 1 }},
{ "$group": {
"_id": "$_id",
"msgs": {
"$push": {
"$cond": [ { "$not": "$seen" }, "$msgs", false ]
}
},
"first": { "$first": "$first" },
"second": { "$first": "$msgs" }
}},
{ "$unwind": "$msgs" },
{ "$project": {
"msgs": 1,
"first": 1,
"second": 1,
"seen": { "$eq": [ "$second", "$msgs" ] }
}},
{ "$sort": { "seen": 1 }},
{ "$group": {
"_id": "$_id",
"msgs": {
"$push": {
"$cond": [ { "$not": "$seen" }, "$msgs", false ]
}
},
"first": { "$first": "$first" },
"second": { "$first": "$second" },
"third": { "$first": "$msgs" }
}},
{ "$unwind": "$msgs" },
{ "$project": {
"msgs": 1,
"first": 1,
"second": 1,
"third": 1,
"seen": { "$eq": [ "$third", "$msgs" ] },
}},
{ "$sort": { "seen": 1 }},
{ "$group": {
"_id": "$_id",
"msgs": {
"$push": {
"$cond": [ { "$not": "$seen" }, "$msgs", false ]
}
},
"first": { "$first": "$first" },
"second": { "$first": "$second" },
"third": { "$first": "$third" },
"forth": { "$first": "$msgs" }
}},
{ "$unwind": "$msgs" },
{ "$project": {
"msgs": 1,
"first": 1,
"second": 1,
"third": 1,
"forth": 1,
"seen": { "$eq": [ "$forth", "$msgs" ] }
}},
{ "$sort": { "seen": 1 }},
{ "$group": {
"_id": "$_id",
"msgs": {
"$push": {
"$cond": [ { "$not": "$seen" }, "$msgs", false ]
}
},
"first": { "$first": "$first" },
"second": { "$first": "$second" },
"third": { "$first": "$third" },
"forth": { "$first": "$forth" },
"fifth": { "$first": "$msgs" }
}},
{ "$unwind": "$msgs" },
{ "$project": {
"msgs": 1,
"first": 1,
"second": 1,
"third": 1,
"forth": 1,
"fifth": 1,
"seen": { "$eq": [ "$fifth", "$msgs" ] }
}},
{ "$sort": { "seen": 1 }},
{ "$group": {
"_id": "$_id",
"msgs": {
"$push": {
"$cond": [ { "$not": "$seen" }, "$msgs", false ]
}
},
"first": { "$first": "$first" },
"second": { "$first": "$second" },
"third": { "$first": "$third" },
"forth": { "$first": "$forth" },
"fifth": { "$first": "$fifth" },
"sixth": { "$first": "$msgs" },
}},
{ "$project": {
"first": 1,
"second": 1,
"third": 1,
"forth": 1,
"fifth": 1,
"sixth": 1,
"pos": { "$const": [ 1,2,3,4,5,6 ] }
}},
{ "$unwind": "$pos" },
{ "$group": {
"_id": "$_id",
"msgs": {
"$push": {
"$cond": [
{ "$eq": [ "$pos", 1 ] },
"$first",
{ "$cond": [
{ "$eq": [ "$pos", 2 ] },
"$second",
{ "$cond": [
{ "$eq": [ "$pos", 3 ] },
"$third",
{ "$cond": [
{ "$eq": [ "$pos", 4 ] },
"$forth",
{ "$cond": [
{ "$eq": [ "$pos", 5 ] },
"$fifth",
{ "$cond": [
{ "$eq": [ "$pos", 6 ] },
"$sixth",
false
]}
]}
]}
]}
]}
]
}
}
}},
{ "$unwind": "$msgs" },
{ "$match": { "msgs": { "$ne": false } }},
{ "$group": {
"_id": "$_id",
"msgs": { "$push": "$msgs" }
}}
])
你会得到数组中最靠前的结果,最多六个条目:
{ "_id" : 123, "msgs" : [ 1, 2, 3, 4, 5, 6 ] }
{ "_id" : 456, "msgs" : [ 12, 13, 14, 15 ] }
正如您在此处看到的,非常有趣。
在您最初分组后,您基本上希望将$first 值“弹出”出堆栈以获取数组结果。为了简化这个过程,我们实际上是在初始操作中这样做的。于是流程变成了:
$cond 的最后一项操作是确保未来的迭代不仅仅是在“切片”计数大于数组成员的情况下一遍又一遍地添加数组的最后一个值。
整个过程需要针对您希望“切片”的任意数量的项目重复。由于我们已经在初始分组中找到了“第一个”项,这意味着需要 n-1 迭代以获得所需的切片结果。
最后的步骤实际上只是将所有内容转换回数组以获得最终结果的可选说明。所以真的只是有条件地将项目或false 推回它们的匹配位置,最后“过滤”出所有false 值,因此最终数组分别具有“六个”和“五个”成员。
因此没有标准的运算符来适应这一点,您不能仅仅将推送“限制”为 5 或 10 或数组中的任何项目。但如果你真的必须这样做,那么这是你最好的方法。
您可以使用 mapReduce 来解决这个问题,并一起放弃聚合框架。我会采取的方法(在合理的范围内)是在服务器上有效地拥有一个内存中的哈希映射并将数组累积到那个位置,同时使用 JavaScript 切片来“限制”结果:
db.messages.mapReduce(
function () {
if ( !stash.hasOwnProperty(this.conversation_ID) ) {
stash[this.conversation_ID] = [];
}
if ( stash[this.conversation_ID.length < maxLen ) {
stash[this.conversation_ID].push( this._id );
emit( this.conversation_ID, 1 );
}
},
function(key,values) {
return 1; // really just want to keep the keys
},
{
"scope": { "stash": {}, "maxLen": 10 },
"finalize": function(key,value) {
return { "msgs": stash[key] };
},
"out": { "inline": 1 }
}
)
这样就基本上构建了与发出的“键”匹配的“内存中”对象,其中一个数组永远不会超过您希望从结果中获取的最大大小。此外,当达到最大堆栈时,这甚至不会费心“发射”项目。
reduce 部分实际上除了归约到“键”和单个值之外什么也没做。因此,以防万一我们的 reducer 没有被调用,如果一个键只存在 1 个值,那么 finalize 函数会负责将“存储”键映射到最终输出。
其效果因输出大小而异,JavaScript 求值肯定不会很快,但可能比在管道中处理大型数组要快。
投票给JIRA issues 以实际使用“切片”运算符,甚至对“$push”和“$addToSet”进行“限制”,这都很方便。个人希望至少可以对$map运算符进行一些修改,以便在处理时暴露“当前索引”值。这将有效地允许“切片”和其他操作。
确实,您可能希望对此进行编码以“生成”所有必需的迭代。如果这里的答案得到了足够的爱和/或其他时间等待我学习,那么我可能会添加一些代码来演示如何做到这一点。这已经是一个相当长的响应了。
生成管道的代码:
var key = "$conversation_ID";
var val = "$_id";
var maxLen = 10;
var stack = [];
var pipe = [];
var fproj = { "$project": { "pos": { "$const": [] } } };
for ( var x = 1; x <= maxLen; x++ ) {
fproj["$project"][""+x] = 1;
fproj["$project"]["pos"]["$const"].push( x );
var rec = {
"$cond": [ { "$eq": [ "$pos", x ] }, "$"+x ]
};
if ( stack.length == 0 ) {
rec["$cond"].push( false );
} else {
lval = stack.pop();
rec["$cond"].push( lval );
}
stack.push( rec );
if ( x == 1) {
pipe.push({ "$group": {
"_id": key,
"1": { "$first": val },
"msgs": { "$push": val }
}});
} else {
pipe.push({ "$unwind": "$msgs" });
var proj = {
"$project": {
"msgs": 1
}
};
proj["$project"]["seen"] = { "$eq": [ "$"+(x-1), "$msgs" ] };
var grp = {
"$group": {
"_id": "$_id",
"msgs": {
"$push": {
"$cond": [ { "$not": "$seen" }, "$msgs", false ]
}
}
}
};
for ( n=x; n >= 1; n-- ) {
if ( n != x )
proj["$project"][""+n] = 1;
grp["$group"][""+n] = ( n == x ) ? { "$first": "$msgs" } : { "$first": "$"+n };
}
pipe.push( proj );
pipe.push({ "$sort": { "seen": 1 } });
pipe.push(grp);
}
}
pipe.push(fproj);
pipe.push({ "$unwind": "$pos" });
pipe.push({
"$group": {
"_id": "$_id",
"msgs": { "$push": stack[0] }
}
});
pipe.push({ "$unwind": "$msgs" });
pipe.push({ "$match": { "msgs": { "$ne": false } }});
pipe.push({
"$group": {
"_id": "$_id",
"msgs": { "$push": "$msgs" }
}
});
这构建了直到maxLen 的基本迭代方法,步骤从$unwind 到$group。还嵌入了所需的最终预测和“嵌套”条件语句的详细信息。最后基本就是对这个问题采取的方法:
Does MongoDB's $in clause guarantee order?