【问题标题】:Keep N most Recent Records by Group While Removing Others按组保留 N 个最近的记录,同时删除其他记录
【发布时间】:2017-08-30 15:19:58
【问题描述】:

MongoDB 中是否有一种方法可以识别(以便删除)除每组最近的 N 条记录之外的所有记录?

我们有一个 MongoDB 集合,其中保存了“文档”和“日志”的集合。每个日志都保留一个对一个 docId 和一个日期的引用:

  • 文档({_id: ObjectId})
  • 登录({ docId: String, date: Date})

日志数据库变得越来越大。我想删除所有日志,但以下任一日志除外:

  • 不到 30 天,或
  • 在给定 docId 的最近 12 条日志中

我知道如何删除所有超过 30 天的日志。但我不知道如何保存文档的最新 N 个日志。

【问题讨论】:

  • 好的。 “每个分组键的前 N ​​个”可能是一个棘手的问题,我之前在这里写过。如果它真的是“巨大的”,那么你最好为此运行并行查询。什么是编程语言环境?还是您愿意将其作为一个完全独立的进程运行?您是否也可能愿意修改“doc”集合文档以实际上始终“存储一个数组”对“12 个最新”的引用?因为这也可以做到,而且实际上是一个真正的资源节约者。

标签: mongodb mongodb-query


【解决方案1】:

在单个语句中无法完成。您可以做的是基本上确定每个可能的 "docId" 值的“最后 12 个”,然后在发出删除文档的请求时通过将列表添加到 $nin 来排除这些文档。

您没有指定首选的编程环境,但这里是使用 nodejs 的一般过程:

const MongoClient = require('mongodb').MongoClient;

const uri = 'mongodb://localhost/test';

(async function() {

  let db;

  // Calculate date cutoff
  let oneDay = 1000 * 60 * 60 * 24,
      thirtyDays = oneDay * 30,
      now = Date.now(),
      cutoff = new Date(
        ( now - ( now % oneDay ) ) - thirtyDays
      );

  try {

    db = await MongoClient.connect(uri);

    let log = db.collection('log');
    let doc = db.collection('doc');

    await new Promise((resolve,reject) => {

      let ops = [];

      let stream = doc.find();

      stream.on('error', reject);
      stream.on('end', async () => {
        if ( ops.length > 0 ) {
          await log.bulkWrite(ops);
          ops = [];
        }
        resolve();
      });

      stream.on('data', async (data) => {

        // Pause processing input stream
        stream.pause();

        // get last 12 for doc
        let last = await (log.find({ docId: data._id })
          .project({ _id: 1 })
          .sort({ date: -1 }).limit(12)).map(d => d._id);

        ops.push({
          deleteMany: {
            filter: {
              _id: { $nin: last },
              docId: data._id,
              date: { $lt: cutoff }
            }
          }
        });

        if ( ops.length >= 1000 ) {
          await log.bulkWrite(ops);
          ops = [];
        }

        // Resume processing input stream
        stream.resume()

      });

    });

  } catch(e) {
    console.error(e);
  } finally {
    db.close();
  }

})();

这里的基本前提是您遍历“doc”集合中的文档,然后对“log”集合执行查询,以返回 12 个最近的文档。然后,我们从找到的每个文档(如果有)中列出 _id 值。

这样做的目的是接下来要做的是对数据库发出deleteMany 操作。因为会有很多这样的我们将使用.bulkWrite() 而不是每次迭代源文档时都发出请求。这大大减少了网络流量和延迟。

然后,基本语句是从光标中的源中删除"docId" 与当前文档匹配的所有文档,并且日期早于 30 天的截止点。

附加条件使用$nin 来“排除”任何在前一个查询中标识为“最近12 个”的文档。这样可以确保始终保留这些文档,因为它们不会被删除。

        ops.push({
          deleteMany: {
            filter: {
              _id: { $nin: last },
              docId: data._id,
              date: { $lt: cutoff }
            }
          }
        });

这就是它的全部内容。其余的处理是关于累积“批处理”直到有 1000 个条目(合理的大小,但是任何低于 16MB BSON 限制的请求都是可能的),当实际请求被发送到服务器以处理并实际删除文件。

当光标用完时,进程完成,所有剩余的“批处理”指令都将被提交。


MongoDB 3.6 预览版

您可以从当前“即将发布”的 MongoDB 版本中获得的一件事是,它允许 $lookup 的“非相关”形式,这意味着我们基本上可以获得每个目标文档的“前 12 个”单个请求而不是发出多个查询。

这样做是因为$lookup 的这种形式将“管道”作为参数,而不是基于本地和外键匹配的固定输出。这让我们可以$match$sort$limit 返回结果。

const MongoClient = require('mongodb').MongoClient;

const uri = 'mongodb://localhost/test';

(async function() {

  let db;

  // Calculate date cutoff
  let oneDay = 1000 * 60 * 60 * 24,
      thirtyDays = oneDay * 30,
      now = Date.now(),
      cutoff = new Date(
        ( now - ( now % oneDay ) ) - thirtyDays
      );

  try {

    db = await MongoClient.connect(uri);

    await new Promise((resolve,reject) => {

      let ops = [];

      let stream = db.collection('doc').aggregate([
        { "$lookup": {
          "from": "log",
          "let": {
            "id": "$_id"
          },
          "pipeline": [
            { "$match": {
              "docId": { "$eq": { "$expr": "$$id" } }
            }},
            { "$sort": { "date": -1 } },
            { "$limit": 12 },
            { "$project": { "_id": 1 } }
          ],
          "as": 'docs'
        }},
      ]);

      stream.on('error', reject);
      stream.on('end', async () => {
        if ( ops.length > 0 ) {
          await db.collection('log').bulkWrite(ops);
          ops = [];
        }
        resolve();
      });

      stream.on('data', async (data) => {
        stream.pause();

        ops.push({
          deleteMany: {
            filter: {
              _id: { $nin: data.docs.map(d => d._id) },
              docId: data._id,
              date: { $lt: cutoff }
            }
          }
        });

        if ( ops.length >= 1000 ) {
          await db.collection('log').bulkWrite(ops);
          ops = [];
        }
        stream.resume();

      });

    });


  } catch(e) {
    console.error(e);
  } finally {
    db.close();
  }

})();

这其中的关键是$expr,它仅在 3.5.12 开发版本中最终确定。这允许高效的$match 表达式,然后使其成为处理单独查询的可行替代方案。

当然,您真的很想等待它准备好投入生产。但最好能意识到这一点,这样您就可以在最终升级底层 MongoDB 时过渡到这样的过程。

【讨论】:

  • 这是一篇很棒的文章!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2019-12-20
  • 1970-01-01
  • 2012-04-12
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多