【问题标题】:How to perform mass inserts into mongodb using NodeJS如何使用 NodeJS 对 mongodb 执行批量插入
【发布时间】:2015-08-17 11:28:10
【问题描述】:

我必须使用 nodejs 在 mongodb 中插入大约 10,00000 个文档。

我使用 for 循环生成这些文档,然后将它们存储到数组中,然后最终将它们插入 mongodb。

var codeArray = new Array();
for (var i = 0; i<1000000; i++){
    var token = strNpm.generate();
    var now = moment().format('YYYYMMDD hhmmss');
    var doc1 = {id:token,
        Discount_strId:"pending",
        Promotion_strCode:token,
        Promotion_strStatus:"I",
        Promotion_dtmGeneratedDate:now,
        User_strLogin:"test",
        Promotion_strMode:"S",
        Promotion_dtmValidFrom:"pending",
        Promotion_dtmValidTill:"pending",
        LastModified_dtmStamp:now
    };
    codeArray.push(doc1);
    db.collection('ClPromoCodeMaster').insert(codeArray, function (err, result) {
    if (err){
        console.log(err);
    }else{
        console.log('Inserted Records - ', result.ops.length);
    }
});

我面临的问题是 mongo 的插入限制为 16mb,所以我不能一次插入整个数组。 请提出最优化的解决方案。

【问题讨论】:

  • 16 MB 是文档大小限制,我不认为你可以一次性插入多少。
  • 是的!但是我的数组已经超过了 16 MB 的限制。所以无法将其插入数据库。
  • 不推荐使用insert...文档建议改用insertOneinsertMany。尝试使用insertManymongodb.github.io/node-mongodb-native/2.0/api/…
  • @PrashanthChandra 仍然是相同的限制。发送的“请求”仍然需要低于 BSON 16MB 限制。这里的关键是不要一次发送所有内容,并且也要尊重回调。
  • @PrashanthChandra 是的。一切都是 BSON,无论是“在线”还是“存储”。

标签: node.js mongodb mongodb-query


【解决方案1】:

主要问题在于请求大小而不是文档大小,但它具有相同的限制。 Bulk operations 和带有 async.whilst 的异步库将处理这个问题:

var bulk = db.collection('ClPromoCodeMaster').initializeOrderedBulkOp(),
    i = 0;

async.whilst(
  function() { return i < 1000000; },
  function(callback) {
    var token = strNpm.generate();
    var now = moment().format('YYYYMMDD hhmmss');
    var doc = {
      id:token,
      Discount_strId:"pending",
      Promotion_strCode:token,
      Promotion_strStatus:"I",
      Promotion_dtmGeneratedDate:now,
      User_strLogin:"test",
      Promotion_strMode:"S",
      Promotion_dtmValidFrom:"pending",
      Promotion_dtmValidTill:"pending",
      LastModified_dtmStamp:now
    };

    bulk.insert(doc);
    i++;

    // Drain every 1000
    if ( i % 1000 == 0 ) {
      bulk.execute(function(err,response){
        bulk = db.collection('ClPromoCodeMaster').initializeOrderedBulkOp();
        callback(err);
      });
    } else {
        callback();
    }

  },
  function(err) {
    if (err) throw err;
    console.log("done");
  }
);

我应该注意,无论批量操作的内部限制为每批 1000 次操作。您可以提交更大的尺寸,但驱动程序只是将它们分解并仍然以 1000 个批量提交。

尽管如此,1000 是一个不错的数字,因为它已经与处理请求的方式一致,并且在耗尽请求队列并发送到服务器。

【讨论】:

    【解决方案2】:

    为了一次插入数百万条记录,使用 MongoDb 批量 API 创建 node.js 子进程分叉。

    子进程创建:(index.js)

    const {fork} = require("child_process");
    let counter = 1;
    
    function createProcess(data){
        const worker =  fork("./dbOperation");    
        worker.send(data);    
        worker.on("message", (msg) => {        
            console.log("Worker Message :",counter, msg);
            counter++;
        })
    
    }
    
    function bulkSaveUser(records) {
        const singleBatchCount = 10000; // Save 10,000 records per hit
        const noOfProcess = Math.ceil(records/singleBatchCount);
        let data = {};
        console.log("No of Process :", noOfProcess);
        for(let index = 1; index <= noOfProcess; index++) {       
            data.startCount = (index == 1) ? index : (((index - 1) * singleBatchCount) + 1); 
            data.endCount = index * singleBatchCount;
            createProcess(data);
        }
    } 
    
    
    bulkSaveUser(1500000);
    

    数据库操作 (dbOperation.js)

    const MongoClient = require('mongodb').MongoClient;
    // Collection Name
    const collectionName = ""; 
    // DB Connection String
    const connString = "";
    
    process.on("message", (msg) => {
        console.log("Initialize Child Process", msg)
        const {startCount, endCount} = msg;
        inputStudents(startCount, endCount);
    });
    
    function initConnection() {
        return new Promise(function(r, e) {
            MongoClient.connect(connString, function(err, db) {
                if (err) e(err)            
                r(db);
            });
        });
    }
    
    function inputStudents(startCount, endCount) {    
    
        let bulkData = [];
        for(let index = startCount; index <= endCount; index++ ){ 
            var types = ['exam', 'quiz', 'homework', 'homework'];
            let scores = []
            // and each class has 4 grades
            for (j = 0; j < 4; j++) {
                scores.push({'type':types[j],'score':Math.random()*100});
            }
            // there are 500 different classes that they can take
            class_id = Math.floor(Math.random()*501); // get a class id between 0 and 500
            record = {'student_id':index, 'scores':scores, 'class_id':class_id};
            bulkData.push({ insertOne : { "document" : record } })
        }
        initConnection()
            .then((db) => {
                const studentDb = db.db("student");
                const collection =  studentDb.collection(colName)  
                console.log("Bulk Data :", bulkData.length);
                collection.bulkWrite(bulkData, function(err, res) {
                    if (err) throw err;
                    //console.log("Connected Successfully",res);
                    process.send("Saved Successfully");
                    db.close();
                });       
            })
            .catch((err) => { console.log("Err :", err) });        
    }
    

    Sample project to insert millions of record in mongodb using child process fork

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2016-10-09
      • 2018-08-15
      • 2016-11-18
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多