【问题标题】:Incremental UnorderedBulkOp calls to 'execute' are exponentially slower对“执行”的增量 UnorderedBulkOp 调用速度呈指数增长
【发布时间】:2023-04-04 14:38:01
【问题描述】:

我正在尝试使用新的 mongo 批量 API 编写自定义批量上传脚本。我正在使用UnorderedBulkOp,它最初工作得非常快,但在被调用几次后它开始挂起。我尝试过使用日志行,似乎在第 10 次通话之后它才真正开始爆炸。如果我停止上传并重新启动它(有用于检查欺骗的代码),对execute 的前几次调用再次执行,因此它似乎不依赖于我收集的数据量。到底是怎么回事?我曾想过将所有操作推送到批量操作并只调用一次执行,但在此处看到另一个答案是在批量操作上逐步调用 execute

精简了一下,它是这样做的:

this.db.collection(collection_name, function(err, collection){
  var bulk = collection.initializeUnorderedBulkOp();
  var operations = 0;
  var dataread = fs.createReadStream(filepath, {encoding: 'utf8'});
  var current = '';

  // load and split data from CSV     
  dataread.on('data', function(data){
    dataread.pause();
    chunk = current + data;
    var split = chunk.split('\n');
    current = split.pop();
    var ids = [];

    for(i=0, len = split.length; i< len; i++){
      lineData = split[i].split(',');
      customid = parseInt(lineData[0]);
      ids.push(customid);
    }

    // find which docs already exist and need to be updated     
    collection.find({customid: {$in: ids}}).toArray(function(err, docs){
      var docmap = {};
      for(i=0, len=docs.length; i<len; i++){
        docmap[docs[i].customid] = docs[i]; 
      }

      for(isplit=0; isplit<split.length; isplit++){
        lineData = split[isplit].split(',');
        customid = parseInt(lineData[0]);

        // check for insert or update
        if(docmap[customid]){
          doc = docmap[customid];
          //update doc
          bulk.find({_id: doc._id}).update({$push: {history: 1}});    
        else{
          doc = formatData(lineData);
          bulk.insert(doc);
        }
        operations++;         
      }

      if(operations > 10000){
        bulk.execute({w: 1}, function(err, result){
          operations = 0;
          dataread.resume();
        });
      }else{
        dataread.resume();
      }    
    });
  });
});

最初我是通过单独调用 collection.save 来执行此操作的,但我的数据集目前大约有 200 万个数据点,我正在寻求优化,因为我将每周运行一次此上传。

【问题讨论】:

    标签: node.js mongodb bulkinsert bulk-load mongojs


    【解决方案1】:

    看来这是批量操作的一个缺点。尽管它处理顺序批量操作,但单个 BulkOp 对象最适合仅处理单个批处理过程。我设法通过在成功的execute 调用的回调中重新初始化bulk 来解决这个问题。

    bulk.execute({w: 1}, function(err, result){
       operations = 0;
       bulk = collection.initializeUnorderedBulkOp();
       dataread.resume();
    });
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-06-30
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多