【问题标题】:Aggregate out from Multiple Collections从多个集合中聚合出来
【发布时间】:2015-11-26 15:52:51
【问题描述】:

如何在不替换另一个聚合输出的集合的情况下将 MongoDB 聚合的结果输出到集合中?

我只需要使用 $out: 'tempCollection' 来获取数据,因为我有 5 亿个文档,并且正在获取 pipeline stage limit

var q = [
  {$match: query},
  {$group: {_id: '$hash'}},
  {$out: 'tempCollection'}
];

async.parallel([
  function(callback) {
    firstCollection.aggregate(q, callback);
  },
  function(callback) {
    secondCollection.aggregate(q, callback);
  },

  ...

], function() {

  // I want to get all from tempCollection (with pagination) here

});

【问题讨论】:

  • 您的问题结构错误。 $out 总是 替换。你真的想在这里做什么? “添加”两个结果都在一个集合中?还是基于某些共同值“合并”两个结果中的其他值?还要具体说明这是基本节点驱动程序还是其他类似猫鼬、僧侣或其他的东西。
  • 我使用猫鼬。我需要以任何方式(合并或写入一个集合等)获取所有不同的哈希值。
  • 选择一个。 “合并” - 意味着您有一个共同的“键”或构成“键”的字段,并且您打算在找到相同键的地方“增加”其他值。 “连接” - 意味着您只希望两组结果最终都在一个集合中。请注意,在后者中,“钥匙”确实需要不同,或者是人为制造的。
  • 如果在 firstCollection 和 secondCollection 中具有相同的“哈希” - 我只需要得到一个(唯一的)结果。所以我使用聚合,因为 mongoose.distinct 用作​​聚合包装器,并且有管道阶段限制

标签: node.js mongodb mongoose mongodb-query aggregation-framework


【解决方案1】:

这里的底线是$out 选项只会“替换”目标集合上的输出。因此,要执行其他任何操作,您必须通过客户端连接工作,而不仅仅是输出到服务器。

使用 mongoose 的最佳选择是直接进入底层驱动程序并在驱动程序支持的情况下访问 node stream interface

简单的例子,但它展示了基本的结构方式:

var async = require('async'),
    mongoose = require('mongoose'),
    Schema = mongoose.Schema;

mongoose.connect('mongodb://localhost/aggtest');

var testSchema = new Schema({},{ "_id": false, strict: false });


var ModelA = mongoose.model( 'ModelA', testSchema ),
    ModelB = mongoose.model( 'ModelB', testSchema ),
    ModelC = mongoose.model( 'ModelC', testSchema );

function processCursor(cursor,target,callback) {

  cursor.on("end",callback);
  cursor.on("error",callback);

  cursor.on("data",function(data) {
    cursor.pause();
    target.update(
      { "_id": data._id },
      { "$setOnInsert": { "_id": data._id } },
      { "upsert": true },
      function(err) {
        if (err) callback(err);
        cursor.resume();
      }
    );
  });
}

async.series(
  [
    // Clean data
    function(callback) {
      async.each([ModelA,ModelB,ModelC],function(model,callback) {
        model.remove({},callback);
      },callback);
    },

    // Sample data
    function(callback) {
      async.each([ModelA,ModelB],function(model,callback) {
        async.each([1,2,3],function(id,callback) {
          model.create({ "_id": id },callback);
        },callback);
      },callback);
    },

    // Run merge
    function(callback) {
      async.parallel(
        [
          function(callback) {
            var cursor = ModelA.collection.aggregate(
              [
                { "$group": { "_id": "$_id" } }
              ],
              { "batchSize": 25 }
            );

            processCursor(cursor,ModelC,callback)
          },
          function(callback) {

            var cursor = ModelB.collection.aggregate(
              [
                { "$group": { "_id": "$_id" } }
              ],
              { "batchSize": 25 }
            );

            processCursor(cursor,ModelC,callback)
          }
        ],
        callback
      );
    },

    // Get merged
    function(callback) {
      ModelC.find({},function(err,results) {
        console.log(results);
        callback(err);
      });
    }
  ],
  function(err) {
    if (err) throw err;
    mongoose.disconnect();
  }
);

除此之外,您将需要$out 来“分离”集合,然后将它们与类似的.update() 进程合并,但要保持它“服务器端”,那么您需要使用@ 987654323@。

这不是很好,但这是在服务器上保持操作的唯一方法。您还可以使用"Bulk" 操作(再次通过相同的本机.collection 接口)修改它以获得更多吞吐量。但选项归结为“读取客户端”或“评估”。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-09-13
    • 2021-10-25
    • 1970-01-01
    • 2022-09-23
    • 2023-03-31
    • 1970-01-01
    • 2019-04-24
    • 2020-02-16
    相关资源
    最近更新 更多