【发布时间】:2026-01-14 07:35:01
【问题描述】:
文档如何在MongoDB中从一个集合移动到另一个集合?例如:我在集合 A 中有很多文档,我想将所有 1 个月前的文档移到集合 B(这些 1 个月前的文档不应该在集合 A 中)。
使用聚合,我们可以进行复制。但我想做的是移动文档。 可以用什么方法来移动文档?
【问题讨论】:
标签: mongodb
文档如何在MongoDB中从一个集合移动到另一个集合?例如:我在集合 A 中有很多文档,我想将所有 1 个月前的文档移到集合 B(这些 1 个月前的文档不应该在集合 A 中)。
使用聚合,我们可以进行复制。但我想做的是移动文档。 可以用什么方法来移动文档?
【问题讨论】:
标签: mongodb
第一个选项(使用 mongo 转储)
1.从集合中获取转储
mongodump -d db -c source_collection
2.从收藏中恢复
mongorestore -d db -c target_collection dir=dump/db_name/source_collection.bson
第二个选项
运行聚合
db.getCollection('source_collection').aggregate([ { $match: {"emailAddress" : "apitester@mailinator.com"} }, { $out: "target_collection" } ])
第三个选项(最慢)
运行一个for循环
db.getCollection('source_collection').find().forEach(function(docs){ db.getCollection('target_collection').insert(docs); }) print("回滚完成!");
【讨论】:
在我的情况下,每个都不起作用。所以我不得不做出一些改变。
var kittySchema = new mongoose.Schema({
name: String
});
var Kitten = mongoose.model('Kitten', kittySchema);
var catSchema = new mongoose.Schema({
name: String
});
var Cat = mongoose.model('Cat', catSchema);
这是两个集合的模型
`function Recursion(){
Kitten.findOne().lean().exec(function(error, results){
if(!error){
var objectResponse = results;
var RequiredId = objectResponse._id;
delete objectResponse._id;
var swap = new Cat(objectResponse);
swap.save(function (err) {
if (err) {
return err;
}
else {
console.log("SUCCESSFULL");
Kitten.deleteOne({ _id: RequiredId }, function(err) {
if (!err) {
console.log('notification!');
}
else {
return err;
}
});
Recursion();
}
});
}
if (err) {
console.log("No object found");
// return err;
}
})
}`
【讨论】:
这是对@jasongarber 答案的更新,它使用了更新的 mongo 'bulkWrite' 操作(Read docs here),并且还保持整个过程异步,因此您可以将它作为更广泛的脚本的一部分运行,这取决于它的完成情况。
async function moveDocuments (sourceCollection, targetCollection, filter) {
const sourceDocs = await sourceCollection.find(filter)
console.log(`Moving ${await sourceDocs.count()} documents from ${sourceCollection.collectionName} to ${targetCollection.collectionName}`)
const idsOfCopiedDocs = await insertDocuments(targetCollection, sourceDocs)
const targetDocs = await targetCollection.find({_id: {$in: idsOfCopiedDocs}})
await deleteDocuments(sourceCollection, targetDocs)
console.log('Done!')
}
async function insertDocuments (collection, documents) {
const insertedIds = []
const bulkWrites = []
await documents.forEach(doc => {
const {_id} = doc
insertedIds.push(_id)
bulkWrites.push({
replaceOne: {
filter: {_id},
replacement: doc,
upsert: true,
},
})
})
if (bulkWrites.length) await collection.bulkWrite(bulkWrites, {ordered: false})
return insertedIds
}
async function deleteDocuments (collection, documents) {
const bulkWrites = []
await documents.forEach(({_id}) => {
bulkWrites.push({
deleteOne: {
filter: {_id},
},
})
})
if (bulkWrites.length) await collection.bulkWrite(bulkWrites, {ordered: false})
}
【讨论】:
插入和删除:
var documentsToMove = db.collectionA.find({});
documentsToMove.forEach(function(doc) {
db.collectionB.insert(doc);
db.collectionA.remove(doc);
});
注意:对于大型集合或包含大型文档的集合,此方法可能会很慢。
【讨论】:
}); 而不仅仅是}。缺少右括号。
更新 2
请不要再赞成这个答案了。正如所写的@jasongarber's answer 在任何方面都更好。
更新
This answer by @jasongarber 是一种更安全的方法,应该代替我使用。
如果我做对了,并且您想移动所有超过 1 个月的文档,并且您使用的是 mongoDB 2.6,那么没有理由不使用批量操作,这是我所知道的进行多项操作的最有效方式:
> var bulkInsert = db.target.initializeUnorderedBulkOp()
> var bulkRemove = db.source.initializeUnorderedBulkOp()
> var date = new Date()
> date.setMonth(date.getMonth() -1)
> db.source.find({"yourDateField":{$lt: date}}).forEach(
function(doc){
bulkInsert.insert(doc);
bulkRemove.find({_id:doc._id}).removeOne();
}
)
> bulkInsert.execute()
> bulkRemove.execute()
这应该很快,它的优点是万一在批量插入过程中出现问题,原始数据仍然存在。
编辑
为了防止内存占用过多,您可以对每个处理的x docs 执行批量操作:
> var bulkInsert = db.target.initializeUnorderedBulkOp()
> var bulkRemove = db.source.initializeUnorderedBulkOp()
> var x = 10000
> var counter = 0
> var date = new Date()
> date.setMonth(date.getMonth() -1)
> db.source.find({"yourDateField":{$lt: date}}).forEach(
function(doc){
bulkInsert.insert(doc);
bulkRemove.find({_id:doc._id}).removeOne();
counter ++
if( counter % x == 0){
bulkInsert.execute()
bulkRemove.execute()
bulkInsert = db.target.initializeUnorderedBulkOp()
bulkRemove = db.source.initializeUnorderedBulkOp()
}
}
)
> bulkInsert.execute()
> bulkRemove.execute()
【讨论】:
Fatal error in CALL_AND_RETRY_2 # Allocation failed - process out of memory。这是在具有 32GB 内存的服务器上,记录只有 5 个字段。集合的总数据大小只有 5GB 左右。
我有 1500 万个文档的 2297 个集合,但有些集合是空的。
仅使用 copyTo 脚本失败,但使用此脚本优化:
db.getCollectionNames().forEach(function(collname) {
var c = db.getCollection(collname).count();
if(c!==0){
db.getCollection(collname).copyTo('master-collection');
print('Copied collection ' + collname);
}
});
对我来说一切都很好。
注意:不推荐使用 copyTo,因为它会阻止读/写操作:所以如果您知道数据库在此操作期间不可用,我认为没问题。
【讨论】:
我计划使用 pymongo 的 bulkinsert 和 bulkdelete 方法一次归档 1000 条记录。
对于源和目标
创建 mongodb 对象以连接到数据库。
实例化批量对象。注意:我也创建了批量对象的备份。这将帮助我在发生错误时回滚插入或删除。 示例:
对于来源
// replace this with mongodb object creation logic
source_db_obj = db_help.create_db_obj(source_db, source_col)
source_bulk = source_db_obj.initialize_ordered_bulk_op()
source_bulk_bak = source_db_obj.initialize_ordered_bulk_op()
对于目标
// replace this with mogodb object creation logic
target_db_obj = db_help.create_db_obj(target_db, target_col)
target_bulk = target_db_obj.initialize_ordered_bulk_op()
target_bulk_bak = target_db_obj.initialize_ordered_bulk_op()
获取符合过滤条件的源记录
source_find_results = source_db_obj.find(过滤器)
遍历源记录
创建目标和源批量操作
将带有当前日期时间的 archived_at 字段附加到目标集合
//replace this with the logic to obtain the UTCtime.
doc['archived_at'] = db_help.getUTCTime()
target_bulk.insert(document)
source_bulk.remove(document)
为了在出现任何错误或异常的情况下进行回滚,请创建 target_bulk_bak 和 source_bulk_bak 操作。
target_bulk_bak.find({'_id':doc['_id']}).remove_one()
source_bulk_bak.insert(doc)
//remove the extra column
doc.pop('archieved_at', None)
当记录数达到1000时,执行目标-批量插入和源-批量删除。注意:此方法需要 target_bulk 和 source_bulk 对象来执行。
execute_bulk_insert_remove(source_bulk, target_bulk)
发生异常时,执行 target_bulk_bak 移除和 source_bulk_bak 插入。这将回滚更改。由于 mongodb 没有回滚功能,所以我想出了这个 hack
execute_bulk_insert_remove(source_bulk_bak, target_bulk_bak)
最后重新初始化源和目标 bulk 和 bulk_bak 对象。这是必要的,因为您只能使用它们一次。
完整代码
def execute_bulk_insert_remove(source_bulk, target_bulk):
try:
target_bulk.execute()
source_bulk.execute()
except BulkWriteError as bwe:
raise Exception(
"could not archive document, reason: {}".format(bwe.details))
def archive_bulk_immediate(filter, source_db, source_col, target_db, target_col):
"""
filter: filter criteria for backup
source_db: source database name
source_col: source collection name
target_db: target database name
target_col: target collection name
"""
count = 0
bulk_count = 1000
source_db_obj = db_help.create_db_obj(source_db, source_col)
source_bulk = source_db_obj.initialize_ordered_bulk_op()
source_bulk_bak = source_db_obj.initialize_ordered_bulk_op()
target_db_obj = db_help.create_db_obj(target_db, target_col)
target_bulk = target_db_obj.initialize_ordered_bulk_op()
target_bulk_bak = target_db_obj.initialize_ordered_bulk_op()
source_find_results = source_db_obj.find(filter)
start = datetime.now()
for doc in source_find_results:
doc['archived_at'] = db_help.getUTCTime()
target_bulk.insert(doc)
source_bulk.find({'_id': doc['_id']}).remove_one()
target_bulk_bak.find({'_id': doc['_id']}).remove_one()
doc.pop('archieved_at', None)
source_bulk_bak.insert(doc)
count += 1
if count % 1000 == 0:
logger.info("count: {}".format(count))
try:
execute_bulk_insert_remove(source_bulk, target_bulk)
except BulkWriteError as bwe:
execute_bulk_insert_remove(source_bulk_bak, target_bulk_bak)
logger.info("Bulk Write Error: {}".format(bwe.details))
raise
source_bulk = source_db_obj.initialize_ordered_bulk_op()
source_bulk_bak = source_db_obj.initialize_ordered_bulk_op()
target_bulk = target_db_obj.initialize_ordered_bulk_op()
target_bulk_bak = target_db_obj.initialize_ordered_bulk_op()
end = datetime.now()
logger.info("archived {} documents to {} in ms.".format(
count, target_col, (end - start)))
【讨论】:
@markus-w-mahlberg 显示的批量操作(和 @mark-mullin 精炼)是有效的,但在编写时不安全。如果 bulkInsert 失败,bulkRemove 仍将继续。为确保您在移动时不会丢失任何记录,请改用它:
function insertBatch(collection, documents) {
var bulkInsert = collection.initializeUnorderedBulkOp();
var insertedIds = [];
var id;
documents.forEach(function(doc) {
id = doc._id;
// Insert without raising an error for duplicates
bulkInsert.find({_id: id}).upsert().replaceOne(doc);
insertedIds.push(id);
});
bulkInsert.execute();
return insertedIds;
}
function deleteBatch(collection, documents) {
var bulkRemove = collection.initializeUnorderedBulkOp();
documents.forEach(function(doc) {
bulkRemove.find({_id: doc._id}).removeOne();
});
bulkRemove.execute();
}
function moveDocuments(sourceCollection, targetCollection, filter, batchSize) {
print("Moving " + sourceCollection.find(filter).count() + " documents from " + sourceCollection + " to " + targetCollection);
var count;
while ((count = sourceCollection.find(filter).count()) > 0) {
print(count + " documents remaining");
sourceDocs = sourceCollection.find(filter).limit(batchSize);
idsOfCopiedDocs = insertBatch(targetCollection, sourceDocs);
targetDocs = targetCollection.find({_id: {$in: idsOfCopiedDocs}});
deleteBatch(sourceCollection, targetDocs);
}
print("Done!")
}
【讨论】:
$out 用于创建带有数据的新集合,因此请使用 $out
db.oldCollection.aggregate([{$out : "newCollection"}])
然后使用drop
db.oldCollection.drop()
【讨论】:
我确实喜欢 @markus-w-mahlberg 的回复,但有时,我看到有必要让它对人们来说更简单一些。因此,我有以下几个功能。你可以像他一样自然地用批量操作符在这里包装东西,但是这段代码同样适用于新旧 Mongo 系统。
function parseNS(ns){
//Expects we are forcing people to not violate the rules and not doing "foodb.foocollection.month.day.year" if they do they need to use an array.
if (ns instanceof Array){
database = ns[0];
collection = ns[1];
}
else{
tNS = ns.split(".");
if (tNS.length > 2){
print('ERROR: NS had more than 1 period in it, please pass as an [ "dbname","coll.name.with.dots"] !');
return false;
}
database = tNS[0];
collection = tNS[1];
}
return {database: database,collection: collection};
}
function insertFromCollection( sourceNS, destNS, query, batchSize, pauseMS){
//Parse and check namespaces
srcNS = parseNS(sourceNS);
destNS = parseNS(destNS);
if ( srcNS == false || destNS == false){return false;}
batchBucket = new Array();
totalToProcess = db.getDB(srcNS.database).getCollection(srcNS.collection).find(query,{_id:1}).count();
currentCount = 0;
print("Processed "+currentCount+"/"+totalToProcess+"...");
db.getDB(srcNS.database).getCollection(srcNS.collection).find(query).addOption(DBQuery.Option.noTimeout).forEach(function(doc){
batchBucket.push(doc);
if ( batchBucket.length > batchSize){
db.getDB(destNS.database).getCollection(destNS.collection)insert(batchBucket);
currentCount += batchBucket.length;
batchBucket = [];
sleep (pauseMS);
print("Processed "+currentCount+"/"+totalToProcess+"...");
}
}
print("Completed");
}
/** Example Usage:
insertFromCollection("foo.bar","foo2.bar",{"type":"archive"},1000,20);
您显然可以添加db.getSiblingDB(srcNS.database).getCollection(srcNS.collection).remove(query,true)
如果您还想在将记录复制到新位置后删除它们。可以很容易地像这样构建代码以使其可重新启动。
【讨论】:
【讨论】:
copyTo 自 3.0 版起已弃用:docs.mongodb.com/manual/reference/method/db.collection.copyTo 此外,它不支持过滤器。
您可以使用范围查询从 sourceCollection 获取数据并将游标数据保存在变量中并在其上循环并插入到目标集合:
var doc = db.sourceCollection.find({
"Timestamp":{
$gte:ISODate("2014-09-01T00:00:00Z"),
$lt:ISODate("2014-10-01T00:00:00Z")
}
});
doc.forEach(function(doc){
db.targetCollection.insert(doc);
})
希望对你有帮助!!
【讨论】:
这是对@Markus W Mahlberg 的重述
回报 - 作为一项功能
function moveDocuments(sourceCollection,targetCollection,filter) {
var bulkInsert = targetCollection.initializeUnorderedBulkOp();
var bulkRemove = sourceCollection.initializeUnorderedBulkOp();
sourceCollection.find(filter)
.forEach(function(doc) {
bulkInsert.insert(doc);
bulkRemove.find({_id:doc._id}).removeOne();
}
)
bulkInsert.execute();
bulkRemove.execute();
}
使用示例
var x = {dsid:{$exists: true}};
moveDocuments(db.pictures,db.artifacts,x)
将所有具有*元素 dsid 的文档从图片移动到工件集合
【讨论】:
可能从性能的角度来看,最好使用一个命令删除大量文档(特别是如果您有查询部分的索引)而不是一个一个地删除它们。
例如:
db.source.find({$gte: start, $lt: end}).forEach(function(doc){
db.target.insert(doc);
});
db.source.remove({$gte: start, $lt: end});
【讨论】: