【问题标题】:Adding JS functions to a parallel queue将 JS 函数添加到并行队列
【发布时间】:2015-03-28 05:13:49
【问题描述】:

我有这段代码,我想运行所有数据库插入,然后在脚本结束时断开与数据库的连接。问题是我需要在循环结束后执行断开回调,但它是不确定的。所以我需要为每个循环创建一个函数,然后只有在所有这些函数完成后,调用该函数以断开与 mongoose/mongoDB 的连接。

有人看到我在这段代码中遇到的问题吗?

//这段代码很接近,但不完全,因为它会在执行很多插入/保存之前断开与 mongo 的连接

var mongoose = require('mongoose')
    , Admin = mongoose.mongo.Admin;

var UserModel = require('../models/UserModel');
UserModel.registerSchema(mongoose);


var fs = require('fs');
var parsedJSON = JSON.parse(fs.readFileSync('../dummy_data/dummy_user_data', 'utf8'));


var system_db = mongoose.connect('mongodb://localhost:27017/local_dev_db');


function insertUsers(callback){


    parsedJSON.forEach(function (item, index) {

        var User = UserModel.getNewUser(system_db);

        var user = new User({
            username: item.username,
            password: item.password,
            address: item.address,
            phone: item.phone,
            email: item.email,
            gender: item.gender,
            about: item.about,
            latitude: item.latitude,
            longitude: item.longitude
        });

        user.save(function (err, result) {
            if (err) {
                console.log("error in player save method:", err);
            }
            console.log(index);
            if (result) {
                //console.log('Added!', result);
            }
        });

    });
    callback();
}


function disconnect(){
    mongoose.disconnect();
    mongoose.connection
        .close(function () {

            console
                .log('Mongoose connection disconnected');

            process.exit(0);

        });
}

insertUsers(disconnect);

所以解决方案是使用 async.parallel 库,只有在所有函数完成后,才调用最终回调函数。但是,我如何以编程方式执行此操作,而不是将一组已知函数放入 async.parallel 代码块中,我需要将未知数量的函数放入 aysnc.parallel 代码块中。看到了吗?

【问题讨论】:

    标签: javascript node.js mongodb asynchronous


    【解决方案1】:

    在这种特殊情况下,async.each() 实际上比 async.parallel() 更适合。 async.each() 是并行的,而 async.eachSeries() 不是。代码如下:

    function insertUsers(callback){
      var User = UserModel.getNewUser(system_db);
    
      async.each(parsedJSON, function (item, eachCb) {
        var user = new User({
            username: item.username,
            password: item.password,
            address: item.address,
            phone: item.phone,
            email: item.email,
            gender: item.gender,
            about: item.about,
            latitude: item.latitude,
            longitude: item.longitude
        });
    
        user.save(function (err, result) {
            if (err) {
                console.log("error in player save method:", err);
            }
            console.log(index);
            if (result) {
                //console.log('Added!', result);
            }
            eachCb();  // if calling with eachCb(err), async.each()
                       // will not continue the rest of the items
                       // in case of error occurs
        });
    
      }, function(err) {
        callback();    // done with all user.save() calls
      });
    }
    

    【讨论】:

      【解决方案2】:

      绝对可以。只需将您的函数动态添加到将传递给async.parallel 的数组中:

      var fs = require('fs');
      var async = require('async');
      
      var mongoose = require('mongoose'),
          Admin = mongoose.mongo.Admin;
      
      var UserModel = require('../models/UserModel');
      UserModel.registerSchema(mongoose);
      
      var parsedJSON = JSON.parse(fs.readFileSync('../dummy_data/dummy_user_data', 'utf8'));
      
      var system_db = mongoose.connect('mongodb://localhost:27017/local_dev_db');
      
      var insertFunctions = [];
      
      parsedJSON.forEach(function (item, index) {
      
          insertFunctions.push(function(callback) {
      
              var User = UserModel.getNewUser(system_db);
      
              var user = new User({
                  username: item.username,
                  password: item.password,
                  address: item.address,
                  phone: item.phone,
                  email: item.email,
                  gender: item.gender,
                  about: item.about,
                  latitude: item.latitude,
                  longitude: item.longitude
              });
      
              user.save(function (err, result) {
      
                  if (err) {
                      console.log("error in player save method:", err);
                      callback(err);
                      return;
                  }
      
                   callback(null, result);
      
              });
          });
      });
      
      function disconnect() {
      
          mongoose.disconnect();
          mongoose.connection
              .close(function () {
      
                  console
                      .log('Mongoose connection disconnected');
      
                  process.exit(0);
              });
      }
      
      //First parameter is the array of functions to run in parallel,
      // second parameter is the callback function
      async.parallel(insertFunctions, disconnect);
      

      【讨论】:

      • 看起来不错,不过我会取出 if(result){} 块,如果 IMO 不需要这样做
      • 实际上@gregnr async.series 给了我预期的控制台输出,但由于某种原因 async.parallel 给了我非常意想不到的控制台输出,不是因为它是无序的,而是因为 90% 的条目丢失了,那不应该发生。但 async.series 有预期的输出,全部 100%,当然是有序的。为什么会这样?
      • 在你的 user.save 函数和回调函数之前加入一个 console.log。你会得到什么样的结果?
      • 使用 async.parallel 我得到了完全奇怪的输出,比如 7 个条目而不是 130 个,并且 console.log 输出在循环中但在 user.save 之外根本不会被记录。使用 async.series,一切都按预期工作。
      • insertFunctions 数组在调用 async.parallel 后不应增长,因为 forEach() 是同步的。在您致电async.parallel 之前,请先拨打console.log(insertFunctions.length)。应该是一致的。
      猜你喜欢
      • 1970-01-01
      • 2011-02-11
      • 1970-01-01
      • 2014-05-11
      • 2012-10-16
      • 2017-07-03
      • 2016-04-05
      • 2021-09-05
      • 1970-01-01
      相关资源
      最近更新 更多