【问题标题】:mongodb move documents from one collection to another collectionmongodb 将文档从一个集合移动到另一个集合
【发布时间】:2026-01-14 07:35:01
【问题描述】:

文档如何在MongoDB从一个集合移动到另一个集合?例如:我在集合 A 中有很多文档,我想将所有 1 个月前的文档移到集合 B(这些 1 个月前的文档不应该在集合 A 中)。

使用聚合,我们可以进行复制。但我想做的是移动文档。 可以用什么方法来移动文档?

【问题讨论】:

    标签: mongodb


    【解决方案1】:

    第一个选项(使用 mongo 转储)

    1.从集合中获取转储

    mongodump -d db -c source_collection

    2.从收藏中恢复

    mongorestore -d db -c target_collection dir=dump/db_name/source_collection.bson

    第二个选项

    运行聚合

    db.getCollection('source_collection').aggregate([ { $match: {"emailAddress" : "apitester@mailinator.com"} }, { $out: "target_collection" } ])

    第三个​​选项(最慢)

    运行一个for循环

    db.getCollection('source_collection').find().forEach(function(docs){ db.getCollection('target_collection').insert(docs); }) print("回滚完成!");

    【讨论】:

      【解决方案2】:

      在我的情况下,每个都不起作用。所以我不得不做出一些改变。

      var kittySchema = new mongoose.Schema({
      name: String
      });
      
      var Kitten = mongoose.model('Kitten', kittySchema);
      
      var catSchema = new mongoose.Schema({
      name: String
      });
      
      var Cat = mongoose.model('Cat', catSchema);
      

      这是两个集合的模型

      `function Recursion(){
      Kitten.findOne().lean().exec(function(error, results){
          if(!error){
              var objectResponse = results;
              var RequiredId = objectResponse._id;
              delete objectResponse._id;
              var swap = new Cat(objectResponse);
              swap.save(function (err) {
                 if (err) {
                     return err;
                 }
                 else {
                     console.log("SUCCESSFULL");
                     Kitten.deleteOne({ _id: RequiredId }, function(err) {
                      if (!err) {
                              console.log('notification!');
                      }
                      else {
                              return err;
                      }
                  });
                     Recursion();
                 }
              });
          }
          if (err) {
              console.log("No object found");
              // return err;
          }
      })
      }`
      

      【讨论】:

        【解决方案3】:

        这是对@jasongarber 答案的更新,它使用了更新的 mongo 'bulkWrite' 操作(Read docs here),并且还保持整个过程异步,因此您可以将它作为更广泛的脚本的一部分运行,这取决于它的完成情况。

        async function moveDocuments (sourceCollection, targetCollection, filter) {
          const sourceDocs = await sourceCollection.find(filter)
        
          console.log(`Moving ${await sourceDocs.count()} documents from ${sourceCollection.collectionName} to ${targetCollection.collectionName}`)
        
          const idsOfCopiedDocs = await insertDocuments(targetCollection, sourceDocs)
        
          const targetDocs = await targetCollection.find({_id: {$in: idsOfCopiedDocs}})
          await deleteDocuments(sourceCollection, targetDocs)
        
          console.log('Done!')
        }
        
        async function insertDocuments (collection, documents) {
          const insertedIds = []
          const bulkWrites = []
        
          await documents.forEach(doc => {
            const {_id} = doc
        
            insertedIds.push(_id)
            bulkWrites.push({
              replaceOne: {
                filter: {_id},
                replacement: doc,
                upsert: true,
              },
            })
          })
        
          if (bulkWrites.length) await collection.bulkWrite(bulkWrites, {ordered: false})
        
          return insertedIds
        }
        
        async function deleteDocuments (collection, documents) {
          const bulkWrites = []
        
          await documents.forEach(({_id}) => {
            bulkWrites.push({
              deleteOne: {
                filter: {_id},
              },
            })
          })
        
          if (bulkWrites.length) await collection.bulkWrite(bulkWrites, {ordered: false})
        }
        

        【讨论】:

          【解决方案4】:

          插入和删除:

          var documentsToMove = db.collectionA.find({});
          documentsToMove.forEach(function(doc) {
              db.collectionB.insert(doc);
              db.collectionA.remove(doc);
          });
          

          注意:对于大型集合或包含大型文档的集合,此方法可能会很慢。

          【讨论】:

          • insert() 和 remove() 是优化的解决方案吗??
          • 不知道 :) 您也可以使用管理工具进行转储和恢复。
          • 这不是原子的,有可能在集合 B 中插入某些东西,而不是从 A 中删除。
          • 最后一行应该是}); 而不仅仅是}。缺少右括号。
          【解决方案5】:

          更新 2

          请不要再赞成这个答案了。正如所写的@jasongarber's answer 在任何方面都更好。

          更新

          This answer by @jasongarber 是一种更安全的方法,应该代替我使用。


          如果我做对了,并且您想移动所有超过 1 个月的文档,并且您使用的是 mongoDB 2.6,那么没有理由不使用批量操作,这是我所知道的进行多项操作的最有效方式:

          > var bulkInsert = db.target.initializeUnorderedBulkOp()
          > var bulkRemove = db.source.initializeUnorderedBulkOp()
          > var date = new Date()
          > date.setMonth(date.getMonth() -1)
          > db.source.find({"yourDateField":{$lt: date}}).forEach(
              function(doc){
                bulkInsert.insert(doc);
                bulkRemove.find({_id:doc._id}).removeOne();
              }
            )
          > bulkInsert.execute()
          > bulkRemove.execute()
          

          这应该很快,它的优点是万一在批量插入过程中出现问题,原始数据仍然存在。


          编辑

          为了防止内存占用过多,您可以对每个处理的x docs 执行批量操作:

          > var bulkInsert = db.target.initializeUnorderedBulkOp()
          > var bulkRemove = db.source.initializeUnorderedBulkOp()
          > var x = 10000
          > var counter = 0
          > var date = new Date()
          > date.setMonth(date.getMonth() -1)
          > db.source.find({"yourDateField":{$lt: date}}).forEach(
              function(doc){
                bulkInsert.insert(doc);
                bulkRemove.find({_id:doc._id}).removeOne();
                counter ++
                if( counter % x == 0){
                  bulkInsert.execute()
                  bulkRemove.execute()
                  bulkInsert = db.target.initializeUnorderedBulkOp()
                  bulkRemove = db.source.initializeUnorderedBulkOp()
                }
              }
            )
          > bulkInsert.execute()
          > bulkRemove.execute()
          

          【讨论】:

          • 或在 Robomongo 等 UI 工具中 db.getCollection('source').find({}).forEach(function(doc) { db.getCollection('target').insert(doc); db.getCollection('source').remove(doc);})
          • @Arthur:您的方法有两个主要缺点。它要慢得多 并且在最坏的情况下,您可能有不完整的集合难以再次同步。
          • 这对我不起作用。我在一个有 50M 记录的集合上尝试了这个,并试图移出大约 25M。查找查询失败,错误为Fatal error in CALL_AND_RETRY_2 # Allocation failed - process out of memory。这是在具有 32GB 内存的服务器上,记录只有 5 个字段。集合的总数据大小只有 5GB 左右。
          • 您的意思是限制查找查询吗?我什至没有执行任何操作,在 foreach 期间发生了内存不足错误。如果批量功能只能处理 10k 块,那么它们就不能真正达到目的:/(对不起,不是在向你发泄,我感谢你的帮助,只是感到有点沮丧!)
          • @UpTheCreek 查看编辑。我只是想排除我们有记忆问题。因为听起来就是这样。可能由操作系统强加。
          【解决方案6】:

          我有 1500 万个文档的 2297 个集合,但有些集合是空的。

          仅使用 copyTo 脚本失败,但使用此脚本优化:

          db.getCollectionNames().forEach(function(collname) {
              var c = db.getCollection(collname).count();
              if(c!==0){
                db.getCollection(collname).copyTo('master-collection');
                print('Copied collection ' + collname);
              }
          });
          

          对我来说一切都很好。

          注意:不推荐使用 copyTo,因为它会阻止读/写操作:所以如果您知道数据库在此操作期间不可用,我认为没问题。

          【讨论】:

            【解决方案7】:

            我计划使用 pymongo 的 bulkinsert 和 bulkdelete 方法一次归档 1000 条记录。

            对于源和目标

            1. 创建 mongodb 对象以连接到数据库。

            2. 实例化批量对象。注意:我也创建了批量对象的备份。这将帮助我在发生错误时回滚插入或删除。 示例:

              对于来源 // replace this with mongodb object creation logic source_db_obj = db_help.create_db_obj(source_db, source_col) source_bulk = source_db_obj.initialize_ordered_bulk_op() source_bulk_bak = source_db_obj.initialize_ordered_bulk_op()
              对于目标 // replace this with mogodb object creation logic target_db_obj = db_help.create_db_obj(target_db, target_col) target_bulk = target_db_obj.initialize_ordered_bulk_op() target_bulk_bak = target_db_obj.initialize_ordered_bulk_op()

            3. 获取符合过滤条件的源记录

              source_find_results = source_db_obj.find(过滤器)

            4. 遍历源记录

              创建目标和源批量操作

              将带有当前日期时间的 archived_at 字段附加到目标集合

              //replace this with the logic to obtain the UTCtime. doc['archived_at'] = db_help.getUTCTime() target_bulk.insert(document) source_bulk.remove(document)

              为了在出现任何错误或异常的情况下进行回滚,请创建 target_bulk_bak 和 source_bulk_bak 操作。

              target_bulk_bak.find({'_id':doc['_id']}).remove_one() source_bulk_bak.insert(doc) //remove the extra column doc.pop('archieved_at', None)

            5. 当记录数达到1000时,执行目标-批量插入和源-批量删除。注意:此方法需要 target_bulk 和 source_bulk 对象来执行。

              execute_bulk_insert_remove(source_bulk, target_bulk)

            6. 发生异常时,执行 target_bulk_bak 移除和 source_bulk_bak 插入。这将回滚更改。由于 mongodb 没有回滚功能,所以我想出了这个 hack

              execute_bulk_insert_remove(source_bulk_bak, target_bulk_bak)

            7. 最后重新初始化源和目标 bulk 和 bulk_bak 对象。这是必要的,因为您只能使用它们一次。

            8. 完整代码

                  def execute_bulk_insert_remove(source_bulk, target_bulk):
                      try:
                          target_bulk.execute()
                          source_bulk.execute()
                      except BulkWriteError as bwe:
                          raise Exception(
                              "could not archive document, reason:    {}".format(bwe.details))
              
                  def archive_bulk_immediate(filter, source_db, source_col, target_db, target_col):
                      """
                      filter: filter criteria for backup
                      source_db: source database name
                      source_col: source collection name
                      target_db: target database name
                      target_col: target collection name
                      """
                      count = 0
                      bulk_count = 1000
              
                      source_db_obj = db_help.create_db_obj(source_db, source_col)
                      source_bulk = source_db_obj.initialize_ordered_bulk_op()
                      source_bulk_bak = source_db_obj.initialize_ordered_bulk_op()
              
                      target_db_obj = db_help.create_db_obj(target_db, target_col)
                      target_bulk = target_db_obj.initialize_ordered_bulk_op()
                      target_bulk_bak = target_db_obj.initialize_ordered_bulk_op()
              
                      source_find_results = source_db_obj.find(filter)
              
                      start = datetime.now()
              
                      for doc in source_find_results:
                          doc['archived_at'] = db_help.getUTCTime()
              
                          target_bulk.insert(doc)
                          source_bulk.find({'_id': doc['_id']}).remove_one()
                          target_bulk_bak.find({'_id': doc['_id']}).remove_one()
                          doc.pop('archieved_at', None)
                          source_bulk_bak.insert(doc)
              
                          count += 1
              
                          if count % 1000 == 0:
                              logger.info("count: {}".format(count))
                              try:
                                  execute_bulk_insert_remove(source_bulk, target_bulk)
                              except BulkWriteError as bwe:
                                  execute_bulk_insert_remove(source_bulk_bak, target_bulk_bak)
                                  logger.info("Bulk Write Error: {}".format(bwe.details))
                                  raise
              
                              source_bulk = source_db_obj.initialize_ordered_bulk_op()
                              source_bulk_bak = source_db_obj.initialize_ordered_bulk_op()
              
                              target_bulk = target_db_obj.initialize_ordered_bulk_op()
                              target_bulk_bak = target_db_obj.initialize_ordered_bulk_op()
              
                      end = datetime.now()
              
                      logger.info("archived {} documents to {} in ms.".format(
                          count, target_col, (end - start)))
              

            【讨论】:

            • 您好,欢迎来到 Stack Overflow!花点时间阅读How to Answer - 这看起来很有帮助,但它会受益于对代码作用的一些解释,考虑edit-ing 吗?
            【解决方案8】:

            @markus-w-mahlberg 显示的批量操作(和 @mark-mullin 精炼)是有效的,但在编写时不安全。如果 bulkInsert 失败,bulkRemove 仍将继续。为确保您在移动时不会丢失任何记录,请改用它:

            function insertBatch(collection, documents) {
              var bulkInsert = collection.initializeUnorderedBulkOp();
              var insertedIds = [];
              var id;
              documents.forEach(function(doc) {
                id = doc._id;
                // Insert without raising an error for duplicates
                bulkInsert.find({_id: id}).upsert().replaceOne(doc);
                insertedIds.push(id);
              });
              bulkInsert.execute();
              return insertedIds;
            }
            
            function deleteBatch(collection, documents) {
              var bulkRemove = collection.initializeUnorderedBulkOp();
              documents.forEach(function(doc) {
                bulkRemove.find({_id: doc._id}).removeOne();
              });
              bulkRemove.execute();
            }
            
            function moveDocuments(sourceCollection, targetCollection, filter, batchSize) {
              print("Moving " + sourceCollection.find(filter).count() + " documents from " + sourceCollection + " to " + targetCollection);
              var count;
              while ((count = sourceCollection.find(filter).count()) > 0) {
                print(count + " documents remaining");
                sourceDocs = sourceCollection.find(filter).limit(batchSize);
                idsOfCopiedDocs = insertBatch(targetCollection, sourceDocs);
            
                targetDocs = targetCollection.find({_id: {$in: idsOfCopiedDocs}});
                deleteBatch(sourceCollection, targetDocs);
              }
              print("Done!")
            }

            【讨论】:

            • 此过程将 id 保存在目标和源中。 (我的情况与帖子有点不同,我偶然发现了这个功能。)
            • 对此处发布的此内容进行异步更新*.com/a/54715607/7583056
            • 你能提供一个运行 moveDocuments 的例子吗?
            • 需要注意的是,这不会从源集合中释放空间。
            • 这是 mongosh 吗?非常酷的解决方案。现在的任务是用 pymongo 重写它。
            【解决方案9】:

            $out 用于创建带有数据的新集合,因此请使用 $out

            db.oldCollection.aggregate([{$out : "newCollection"}])
            

            然后使用drop

            db.oldCollection.drop()
            

            【讨论】:

            • 请记住,如果它已经以该名称存在,这将覆盖整个集合(而不是从旧集合中附加匹配的文档)!
            【解决方案10】:

            我确实喜欢 @markus-w-mahlberg 的回复,但有时,我看到有必要让它对人们来说更简单一些。因此,我有以下几个功能。你可以像他一样自然地用批量操作符在这里包装东西,但是这段代码同样适用于新旧 Mongo 系统。

            function parseNS(ns){
                //Expects we are forcing people to not violate the rules and not doing "foodb.foocollection.month.day.year" if they do they need to use an array.
                if (ns instanceof Array){
                    database =  ns[0];
                    collection = ns[1];
                }
                else{
                    tNS =  ns.split(".");
                    if (tNS.length > 2){
                        print('ERROR: NS had more than 1 period in it, please pass as an [ "dbname","coll.name.with.dots"] !');
                        return false;
                    }
                    database = tNS[0];
                    collection = tNS[1];
                }
                return {database: database,collection: collection};
            }
            
            function insertFromCollection( sourceNS,  destNS, query, batchSize, pauseMS){
                //Parse and check namespaces
                srcNS = parseNS(sourceNS);
                destNS = parseNS(destNS);
                if ( srcNS == false ||  destNS == false){return false;}
            
                batchBucket = new Array();
                totalToProcess = db.getDB(srcNS.database).getCollection(srcNS.collection).find(query,{_id:1}).count();
                currentCount = 0;
                print("Processed "+currentCount+"/"+totalToProcess+"...");
                db.getDB(srcNS.database).getCollection(srcNS.collection).find(query).addOption(DBQuery.Option.noTimeout).forEach(function(doc){
                    batchBucket.push(doc);
                    if ( batchBucket.length > batchSize){
                        db.getDB(destNS.database).getCollection(destNS.collection)insert(batchBucket);
                        currentCount += batchBucket.length;
                        batchBucket = [];
                        sleep (pauseMS);
                        print("Processed "+currentCount+"/"+totalToProcess+"...");       
                    }
                }
                print("Completed");
            }
            
            /** Example Usage:
                    insertFromCollection("foo.bar","foo2.bar",{"type":"archive"},1000,20);    
            

            您显然可以添加db.getSiblingDB(srcNS.database).getCollection(srcNS.collection).remove(query,true) 如果您还想在将记录复制到新位置后删除它们。可以很容易地像这样构建代码以使其可重新启动。

            【讨论】:

              【解决方案11】:

              从 MongoDB 3.0 起,您可以使用 copyTo 命令,语法如下:

              db.source_collection.copyTo("target_collection")
              

              然后你可以使用drop命令删除旧集合:

              db.source_collection.drop()
              

              【讨论】:

              【解决方案12】:

              您可以使用范围查询从 sourceCollection 获取数据并将游标数据保存在变量中并在其上循环并插入到目标集合:

               var doc = db.sourceCollection.find({
                      "Timestamp":{
                            $gte:ISODate("2014-09-01T00:00:00Z"),
                            $lt:ISODate("2014-10-01T00:00:00Z")
                      }
               });
              
               doc.forEach(function(doc){
                  db.targetCollection.insert(doc);
               })
              

              希望对你有帮助!!

              【讨论】:

              【解决方案13】:

              这是对@Markus W Mahlberg 的重述

              回报 - 作为一项功能

              function moveDocuments(sourceCollection,targetCollection,filter) {
                  var bulkInsert = targetCollection.initializeUnorderedBulkOp();
                  var bulkRemove = sourceCollection.initializeUnorderedBulkOp();
                  sourceCollection.find(filter)
                      .forEach(function(doc) {
                      bulkInsert.insert(doc);
                      bulkRemove.find({_id:doc._id}).removeOne();
                      }
                )
                bulkInsert.execute();
                bulkRemove.execute();
              }
              

              使用示例

              var x = {dsid:{$exists: true}};
              moveDocuments(db.pictures,db.artifacts,x)
              

              将所有具有*元素 dsid 的文档从图片移动到工件集合

              【讨论】:

                【解决方案14】:

                可能从性能的角度来看,最好使用一个命令删除大量文档(特别是如果您有查询部分的索引)而不是一个一个地删除它们。

                例如:

                db.source.find({$gte: start, $lt: end}).forEach(function(doc){
                   db.target.insert(doc);
                });
                db.source.remove({$gte: start, $lt: end});
                

                【讨论】: