【问题标题】:Nodejs: Issue with async and forEach - Need to wait the async to resolveNodejs:异步和 forEach 的问题 - 需要等待异步解决
【发布时间】:2018-06-02 05:52:32
【问题描述】:

我对 node 还很陌生,我一直在学习和处理异步/承诺。现在我正在尝试创建一个从数据库(例如 10K 行)进行插入的进程,调用一个转换一列的 Web 服务,然后为修改后的数据进行插入。

所以,我做了一个 Oracle SQL 查询,并为结果做了一个 foreach:

let counter = 0;
var binds = [];

res.rows.forEach((row) => {

    var original_data = row[0];

    Transform(original_data).then(new_data => {

        counter++;

        binds.push([original_data,new_data]);

        if (counter % 1000 === 0){
            console.log(`1K rows`);
            doInserts(binds);
            binds = [];
        }

    });

});

我每 1000 行调用一次doInserts,所以我没有在 Oracle 上打开很多事务。

Transform 函数调用一个 Web 服务,该服务使用我需要的值进行解析。

function Transform(value){

    return new Promise(function(resolve, reject){
        var requestPath = `http://localhost:3000/transform/${value}`;
        var req = request.get(requestPath, function(err, response, body){
            if (!err && response.statusCode == 200){
                resolve(body);
            }else{
                reject("API didn't respond.");
            }
        }).end();
    });

}

但是,当 foreach 有 10K 行时,这会阻塞 Web 服务(我使用 request 库进行连接)。我在想,foreach 不会一次同步进行一次转换。

这可能是我不知道很多节点、异步、承诺......但我很困惑。有人可以帮忙吗?

【问题讨论】:

  • 哇,从哪里开始? :) 1) res.rows 来自哪里? 2) 你的forEach 关闭res.rows 是一个同步循环做异步工作,通常不是一个好主意。 3)似乎您的转换在本地主机上。你可以调用一个函数来完成这项工作吗? 4) 获取行的逻辑和执行插入的逻辑看起来有风险。如果中途失败了怎么办?如果这是您想要的批量大小,为什么不一次只获取 1000 行 - 那么您只需要确保您有办法查询下一个“未处理”集。 5)doInserts是如何实现的?
  • 如果要插入很多行,请确保使用 executeMany() 并且不要进行单行插入。并使用绑定。

标签: javascript node.js


【解决方案1】:

你同时做了很多请求。尝试设置一个并发。可以使用bluebird的Promise.map:http://bluebirdjs.com/docs/api/promise.map.html

await Promise.map(rows, async (row) => {
    const new_data = await Transform(row[0])
    ...
}, {concurrency: 3})  // allow max 3 request at the same time

【讨论】:

    【解决方案2】:

    您可以使用任何 Promise 库或 ES6 Promise 来收集一组 Promise 并一起解决它们。

    在这个例子中,我将使用bluebird

    const Promise = require('bluebird');
    
    async function resolvingManyPromises() {
        let counter = 0;
        let binds = [];
        let promiseArray = [];
    
        res.rows.forEach(row => {
            var original_data = row[0];
    
            promiseArray.push(Transform(original_data));
        });
    
        const resolvedPromises = await Promise.all(promiseArray);
    
        // Do something with the resolved values resolvedPromises
    }
    

    请注意,Promise.all 将尝试同时并行解析数组中的所有 Promise。如果您的数据库有连接数限制,某些调用可能会失败。

    【讨论】:

    • 谢谢杰森,但我认为这就是正在发生的事情。 Transform() 中的请求建立的连接过多,导致 API 停止响应。
    【解决方案3】:

    如果一个失败并且所有成功都丢失,则所选答案将拒绝(如果最后一个拒绝,则除了最后一个结果之外,您已经失去了所有结果)。

    这是适合您情况的代码,有关该代码的更多信息可以找到here它不使用bluebird而是使用lib.throttle(来自包含我编写的通常有用的函数的lib)

    //lib comes from: https://github.com/amsterdamharu/lib/blob/master/src/index.js
    const lib = require("lib");
    
    const Fail = function(reason){this.reason=reason;};
    const isFail = o=>(o&&o.constructor)===Fail;
    const isNotFail = o=>!isFail(o);
    const handleBatch = results =>{//this will handle results of a batch
      //failed are the requests that failed
      //you may want to save the ones that failed to file or something
      const failed = results.filter(isFail);
      const successes = results.filter(result=>!isFail(result));
      return doInserts(successes);
    };
    const processor = throttler => row =>
      throttler(//throttling Transform to max 10 active
        row=>
          Transform(row[0])
          .then(new_data =>[row[0],new_data])
        )(row)
        .catch(err=>new Fail([err,row]))//catch reject and resolve with fail object
    ;
    //start the process
    lib.batchProcess (handleBatch) (1000) (processor(lib.throttle(10))) ([]) (res.rows)
    .then(
      results=>console.log("Process done")
      ,err=>console.error("This should not happen:".err)
    );
    

    【讨论】:

      猜你喜欢
      • 2019-03-27
      • 1970-01-01
      • 1970-01-01
      • 2014-11-23
      • 1970-01-01
      • 1970-01-01
      • 2018-12-09
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多