【问题标题】:Bulk upsert in MongoDB using mongoose使用 mongoose 在 MongoDB 中批量更新插入
【发布时间】:2014-10-06 18:11:31
【问题描述】:

是否有任何选项可以使用 mongoose 执行批量 upserts?所以基本上有一个数组,如果它不存在则插入每个元素,或者如果它存在则更新它? (我正在使用海关_ids)

当我使用 .insert 时,MongoDB 会为重复键返回错误 E11000(应该更新)。插入多个新文档可以正常工作:

var Users = self.db.collection('Users');

Users.insert(data, function(err){
            if (err) {
                callback(err);
            }
            else {
                callback(null);
            }
        });

使用.save会返回参数必须是单个文档的错误:

Users.save(data, function(err){
   ...
}

This answer 建议没有这样的选项,但是它特定于 C# 并且已经 3 岁了。所以我想知道是否有任何选择可以使用猫鼬来做到这一点?

谢谢!

【问题讨论】:

  • 批量更新插入是什么意思?如果未找到要更新的文档,则 update upsert 标志如果设置为 true,则会创建一个新文档。 docs.mongodb.org/manual/reference/glossary/#term-upsert
  • @joao 可能在给出的答案中提到的“批量”操作 API 中提到。

标签: javascript node.js mongodb mongoose mongodb-query


【解决方案1】:

没有具体在“猫鼬”中,或者至少在撰写本文时还没有。 2.6 版的 MongoDB shell 实际上使用 "Bulk operations API"“幕后”,因为它用于所有通用辅助方法。在它的实现中,它首先尝试执行此操作,如果检测到旧版本的服务器,则会“回退”到旧的实现。

“当前”的所有 mongoose 方法都使用“旧版”实现或写入关注响应和基本的旧版方法。但是任何给定的 mongoose 模型都有一个 .collection 访问器,它本质上是从实现 mongoose 的底层“节点本机驱动程序”访问“集合对象”:

 var mongoose = require('mongoose'),
     Schema = mongoose.Schema;

 mongoose.connect('mongodb://localhost/test');

 var sampleSchema  = new Schema({},{ "strict": false });

 var Sample = mongoose.model( "Sample", sampleSchema, "sample" );

 mongoose.connection.on("open", function(err,conn) { 

    var bulk = Sample.collection.initializeOrderedBulkOp();
    var counter = 0;

    // representing a long loop
    for ( var x = 0; x < 100000; x++ ) {

        bulk.find(/* some search */).upsert().updateOne(
            /* update conditions */
        });
        counter++;

        if ( counter % 1000 == 0 )
            bulk.execute(function(err,result) {             
                bulk = Sample.collection.initializeOrderedBulkOp();
            });
    }

    if ( counter % 1000 != 0 )
        bulk.execute(function(err,result) {
           // maybe do something with result
        });

 });

主要的问题是“猫鼬方法”实际上知道连接可能尚未真正建立,并且“排队”直到完成。您正在“挖掘”的本机驱动程序没有这种区别。

因此,您确实必须意识到连接是以某种方式或形式建立的。但是你可以使用本机驱动方法,只要你小心你在做什么。

【讨论】:

  • 谢谢!这很好用。我本来会喜欢 joao 的方法,但我没有设法用 .update() 上传多个文档......当然我可以在 for 循环中完成,但我想批量上传更有效?还是因为数据库连接仍然打开,所以没有区别?
  • @user3122267 Upsert ant Bulk 基本上是“粉笔和奶酪”,不一样甚至不相近。 “upsert”创建一个不存在的新文档,并且“Bulk”是批量操作。另一个选项是“multi”,因为.update() 默认只会修改“第一个”找到的文档。喜欢这种方法吗?看到一无所知的评论者与真正有知识的回答者之间的巨大差异吗?
  • @zstew 提出新问题的正确位置是提出另一个问题,而不是评论旧帖子。您似乎错过了此答案末尾的陈述。如果您仍然不明白这意味着什么,请再问一个问题。
  • 我注意到这个答案和@konsumer 同步循环所有记录。我很好奇在一个刻度中创建 10 个 bulk 操作与在 10 个单独的刻度中创建 10 个 bulk 操作的性能差异(就 Node 中的内存使用而言)。
  • @joeytwiddle “批量”操作在您调用 .execute() 之前不是异步的。目的是服务器的任何“来回”都会消耗 IO,因此您正在尝试将其最小化。的确,在同步循环中,您可能会多次发生.execute() 并使用多个连接。但是您可以使用 async.whilst 或其他控件来更改它,其中迭代可以由回调控制(因此在 .execute() 内部)以处理完成。用 Promise 来做这件事有点困难,但仍然可能。
【解决方案2】:

您不需要像@neil-lunn 建议的那样管理限制(1000)。猫鼬已经这样做了。我以他的出色回答作为这个完整的基于 Promise 的实施和示例的基础:

var Promise = require('bluebird');
var mongoose = require('mongoose');

var Show = mongoose.model('Show', {
  "id": Number,
  "title": String,
  "provider":  {'type':String, 'default':'eztv'}
});

/**
 * Atomic connect Promise - not sure if I need this, might be in mongoose already..
 * @return {Priomise}
 */
function connect(uri, options){
  return new Promise(function(resolve, reject){
    mongoose.connect(uri, options, function(err){
      if (err) return reject(err);
      resolve(mongoose.connection);
    });
  });
}

/**
 * Bulk-upsert an array of records
 * @param  {Array}    records  List of records to update
 * @param  {Model}    Model    Mongoose model to update
 * @param  {Object}   match    Database field to match
 * @return {Promise}  always resolves a BulkWriteResult
 */
function save(records, Model, match){
  match = match || 'id';
  return new Promise(function(resolve, reject){
    var bulk = Model.collection.initializeUnorderedBulkOp();
    records.forEach(function(record){
      var query = {};
      query[match] = record[match];
      bulk.find(query).upsert().updateOne( record );
    });
    bulk.execute(function(err, bulkres){
        if (err) return reject(err);
        resolve(bulkres);
    });
  });
}

/**
 * Map function for EZTV-to-Show
 * @param  {Object} show EZTV show
 * @return {Object}      Mongoose Show object
 */
function mapEZ(show){
  return {
    title: show.title,
    id: Number(show.id),
    provider: 'eztv'
  };
}

// if you are  not using EZTV, put shows in here
var shows = []; // giant array of {id: X, title: "X"}

// var eztv = require('eztv');
// eztv.getShows({}, function(err, shows){
//   if(err) return console.log('EZ Error:', err);

//   var shows = shows.map(mapEZ);
  console.log('found', shows.length, 'shows.');
  connect('mongodb://localhost/tv', {}).then(function(db){
    save(shows, Show).then(function(bulkRes){
      console.log('Bulk complete.', bulkRes);
      db.close();
    }, function(err){
        console.log('Bulk Error:', err);
        db.close();
    });
  }, function(err){
    console.log('DB Error:', err);
  });

// });

这样做的好处是在连接完成后关闭连接,如果您关心则显示任何错误,但如果不关心则忽略它们(Promises 中的错误回调是可选的。)它也非常快。只是把这个留在这里分享我的发现。例如,如果要将所有 eztv 节目保存到数据库中,可以取消注释 eztv 内容。

【讨论】:

  • 这不会消耗更多内存吗?
  • 消耗的内存多于什么?
  • 是的。这就是bulk.execute 所做的。 docs.mongodb.org/v3.0/reference/method/…
  • @ECMAScript 实际上,Neil 和 konsumer 的建议都消耗了相似数量的 Node 内存,因为这两种技术都在不断创建文档而不等待 Mongo 响应。显然,如果您打算插入的文档超出 RAM 的容量,这只是一个问题。
  • @PirateApp 也许你的内存不足来保存结构?你得到什么错误?如果您没有足够的内存来保存它,您可能必须使用串行 promise 一个接一个地运行它们或批量运行它们。
【解决方案3】:
await Model.bulkWrite(docs.map(doc => ({
    updateOne: {
        filter: {id: doc.id},
        update: doc,
        upsert: true
    }
})))


或更详细:

const bulkOps = docs.map(doc => ({
    updateOne: {
        filter: {id: doc.id},
        update: doc,
        upsert: true
    }
}))

Model.bulkWrite(bulkOps)
        .then(bulkWriteOpResult => console.log('BULK update OK:', bulkWriteOpResult))
        .catch(err => console.error('BULK update error:', err))

https://stackoverflow.com/a/60330161/5318303

【讨论】:

    【解决方案4】:

    我已经发布了一个用于 Mongoose 的插件,它公开了一个静态的 upsertMany 方法来使用一个 Promise 接口执行批量 upsert 操作。

    与在底层集合上初始化您自己的批量操作相比,使用此插件的另一个好处是,此插件首先将您的数据转换为 Mongoose 模型的数据,然后在 upsert 之前转换回普通对象。这可确保应用 Mongoose 模式验证,并且数据已被取消填充并适合原始插入。

    https://github.com/meanie/mongoose-upsert-many https://www.npmjs.com/package/@meanie/mongoose-upsert-many

    希望对你有帮助!

    【讨论】:

      【解决方案5】:

      如果您没有在 db.collection 中看到批量方法,即您收到的错误是 xxx 变量没有方法:initializeOrderedBulkOp()

      尝试更新您的猫鼬版本。显然,较旧的 mongoose 版本不会通过所有底层 mongo db.collection 方法。

      npm 安装猫鼬

      帮我处理好了。

      【讨论】:

        【解决方案6】:

        我最近在我的电子商务应用中存储产品时必须实现这一点。我的数据库曾经超时,因为我必须每 4 小时更新 10000 个项目。对我来说,一种选择是在连接到数据库时在猫鼬中设置 socketTimeoutMS 和 connectTimeoutMS,但它有点 hacky,我不想操纵数据库的连接超时默认值。我还看到@neil lunn 的解决方案采用了一种简单的同步方法,即在 for 循环中取模。这是我的一个异步版本,我相信它做得更好

        let BATCH_SIZE = 500
        Array.prototype.chunk = function (groupsize) {
            var sets = [];
            var chunks = this.length / groupsize;
        
            for (var i = 0, j = 0; i < chunks; i++ , j += groupsize) {
                sets[i] = this.slice(j, j + groupsize);
            }
        
            return sets;
        }
        
        function upsertDiscountedProducts(products) {
        
            //Take the input array of products and divide it into chunks of BATCH_SIZE
        
            let chunks = products.chunk(BATCH_SIZE), current = 0
        
            console.log('Number of chunks ', chunks.length)
        
            let bulk = models.Product.collection.initializeUnorderedBulkOp();
        
            //Get the current time as timestamp
            let timestamp = new Date(),
        
                //Keep track of the number of items being looped
                pendingCount = 0,
                inserted = 0,
                upserted = 0,
                matched = 0,
                modified = 0,
                removed = 0,
        
                //If atleast one upsert was performed
                upsertHappened = false;
        
            //Call the load function to get started
            load()
            function load() {
        
                //If we have a chunk to process
                if (current < chunks.length) {
                    console.log('Current value ', current)
        
                    for (let i = 0; i < chunks[current].length; i++) {
                        //For each item set the updated timestamp to the current time
                        let item = chunks[current][i]
        
                        //Set the updated timestamp on each item
                        item.updatedAt = timestamp;
        
                        bulk.find({ _id: item._id })
                            .upsert()
                            .updateOne({
                                "$set": item,
        
                                //If the item is being newly inserted, set a created timestamp on it
                                "$setOnInsert": {
                                    "createdAt": timestamp
                                }
                            })
                    }
        
                    //Execute the bulk operation for the current chunk
                    bulk.execute((error, result) => {
                        if (error) {
                            console.error('Error while inserting products' + JSON.stringify(error))
                            next()
                        }
                        else {
        
                            //Atleast one upsert has happened
                            upsertHappened = true;
                            inserted += result.nInserted
                            upserted += result.nUpserted
                            matched += result.nMatched
                            modified += result.nModified
                            removed += result.nRemoved
        
                            //Move to the next chunk
                            next()
                        }
                    })
        
        
        
                }
                else {
                    console.log("Calling finish")
                    finish()
                }
        
            }
        
            function next() {
                current++;
        
                //Reassign bulk to a new object and call load once again on the new object after incrementing chunk
                bulk = models.Product.collection.initializeUnorderedBulkOp();
                setTimeout(load, 0)
            }
        
            function finish() {
        
                console.log('Inserted ', inserted + ' Upserted ', upserted, ' Matched ', matched, ' Modified ', modified, ' Removed ', removed)
        
                //If atleast one chunk was inserted, remove all items with a 0% discount or not updated in the latest upsert
                if (upsertHappened) {
                    console.log("Calling remove")
                    remove()
                }
        
        
            }
        
            /**
             * Remove all the items that were not updated in the recent upsert or those items with a discount of 0
             */
            function remove() {
        
                models.Product.remove(
                    {
                        "$or":
                        [{
                            "updatedAt": { "$lt": timestamp }
                        },
                        {
                            "discount": { "$eq": 0 }
                        }]
                    }, (error, obj) => {
                        if (error) {
                            console.log('Error while removing', JSON.stringify(error))
                        }
                        else {
                            if (obj.result.n === 0) {
                                console.log('Nothing was removed')
                            } else {
                                console.log('Removed ' + obj.result.n + ' documents')
                            }
                        }
                    }
                )
            }
        }
        

        【讨论】:

        • @neil-lunn 你的解决方案,如果我没记错的话,会一起创建多个批量对象并且它们都异步执行,但我让它在我的解决方案中的给定时间只有一个 bulk.execute
        • 据我了解,您正在串行处理批次。我认为确保内存不会过载是正确的。但是一次只有一批,有时你的数据库会等待网络,有时网络会等待 CPU。并行运行 5-10 个较小的批次(每次完成较早的批次时,都会依次启动一个新批次),通过确保系统中可以工作的所有部分都在工作,可能会略微增加吞吐量。
        【解决方案7】:

        你可以使用mongoose的Model.bulkWrite()

        const res = await Character.bulkWrite([
          {
            updateOne: {
              filter: { name: 'Will Riker' },
              update: { age: 29 },
              upsert: true
            }
          },
          {
            updateOne: {
              filter: { name: 'Geordi La Forge' },
              update: { age: 29 },
              upsert: true
            }
          }
        ]);
        

        参考:https://masteringjs.io/tutorials/mongoose/upsert

        【讨论】:

          猜你喜欢
          • 2016-09-19
          • 2016-03-15
          • 2018-03-24
          • 1970-01-01
          • 1970-01-01
          • 2017-07-12
          • 2015-10-20
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多