在单个语句中无法完成。您可以做的是基本上确定每个可能的 "docId" 值的“最后 12 个”,然后在发出删除文档的请求时通过将列表添加到 $nin 来排除这些文档。
您没有指定首选的编程环境,但这里是使用 nodejs 的一般过程:
const MongoClient = require('mongodb').MongoClient;
const uri = 'mongodb://localhost/test';
(async function() {
let db;
// Calculate date cutoff
let oneDay = 1000 * 60 * 60 * 24,
thirtyDays = oneDay * 30,
now = Date.now(),
cutoff = new Date(
( now - ( now % oneDay ) ) - thirtyDays
);
try {
db = await MongoClient.connect(uri);
let log = db.collection('log');
let doc = db.collection('doc');
await new Promise((resolve,reject) => {
let ops = [];
let stream = doc.find();
stream.on('error', reject);
stream.on('end', async () => {
if ( ops.length > 0 ) {
await log.bulkWrite(ops);
ops = [];
}
resolve();
});
stream.on('data', async (data) => {
// Pause processing input stream
stream.pause();
// get last 12 for doc
let last = await (log.find({ docId: data._id })
.project({ _id: 1 })
.sort({ date: -1 }).limit(12)).map(d => d._id);
ops.push({
deleteMany: {
filter: {
_id: { $nin: last },
docId: data._id,
date: { $lt: cutoff }
}
}
});
if ( ops.length >= 1000 ) {
await log.bulkWrite(ops);
ops = [];
}
// Resume processing input stream
stream.resume()
});
});
} catch(e) {
console.error(e);
} finally {
db.close();
}
})();
这里的基本前提是您遍历“doc”集合中的文档,然后对“log”集合执行查询,以返回 12 个最近的文档。然后,我们从找到的每个文档(如果有)中列出 _id 值。
这样做的目的是接下来要做的是对数据库发出deleteMany 操作。因为会有很多这样的我们将使用.bulkWrite() 而不是每次迭代源文档时都发出请求。这大大减少了网络流量和延迟。
然后,基本语句是从光标中的源中删除"docId" 与当前文档匹配的所有文档,并且日期早于 30 天的截止点。
附加条件使用$nin 来“排除”任何在前一个查询中标识为“最近12 个”的文档。这样可以确保始终保留这些文档,因为它们不会被删除。
ops.push({
deleteMany: {
filter: {
_id: { $nin: last },
docId: data._id,
date: { $lt: cutoff }
}
}
});
这就是它的全部内容。其余的处理是关于累积“批处理”直到有 1000 个条目(合理的大小,但是任何低于 16MB BSON 限制的请求都是可能的),当实际请求被发送到服务器以处理并实际删除文件。
当光标用完时,进程完成,所有剩余的“批处理”指令都将被提交。
MongoDB 3.6 预览版
您可以从当前“即将发布”的 MongoDB 版本中获得的一件事是,它允许 $lookup 的“非相关”形式,这意味着我们基本上可以获得每个目标文档的“前 12 个”单个请求而不是发出多个查询。
这样做是因为$lookup 的这种形式将“管道”作为参数,而不是基于本地和外键匹配的固定输出。这让我们可以$match、$sort 和$limit 返回结果。
const MongoClient = require('mongodb').MongoClient;
const uri = 'mongodb://localhost/test';
(async function() {
let db;
// Calculate date cutoff
let oneDay = 1000 * 60 * 60 * 24,
thirtyDays = oneDay * 30,
now = Date.now(),
cutoff = new Date(
( now - ( now % oneDay ) ) - thirtyDays
);
try {
db = await MongoClient.connect(uri);
await new Promise((resolve,reject) => {
let ops = [];
let stream = db.collection('doc').aggregate([
{ "$lookup": {
"from": "log",
"let": {
"id": "$_id"
},
"pipeline": [
{ "$match": {
"docId": { "$eq": { "$expr": "$$id" } }
}},
{ "$sort": { "date": -1 } },
{ "$limit": 12 },
{ "$project": { "_id": 1 } }
],
"as": 'docs'
}},
]);
stream.on('error', reject);
stream.on('end', async () => {
if ( ops.length > 0 ) {
await db.collection('log').bulkWrite(ops);
ops = [];
}
resolve();
});
stream.on('data', async (data) => {
stream.pause();
ops.push({
deleteMany: {
filter: {
_id: { $nin: data.docs.map(d => d._id) },
docId: data._id,
date: { $lt: cutoff }
}
}
});
if ( ops.length >= 1000 ) {
await db.collection('log').bulkWrite(ops);
ops = [];
}
stream.resume();
});
});
} catch(e) {
console.error(e);
} finally {
db.close();
}
})();
这其中的关键是$expr,它仅在 3.5.12 开发版本中最终确定。这允许高效的$match 表达式,然后使其成为处理单独查询的可行替代方案。
当然,您真的很想等待它准备好投入生产。但最好能意识到这一点,这样您就可以在最终升级底层 MongoDB 时过渡到这样的过程。