【问题标题】:Node.js - want 5 parallel calls to a method in a loopNode.js - 想要在循环中对一个方法进行 5 次并行调用
【发布时间】:2020-04-07 18:03:33
【问题描述】:

我在 MongoDB 集合中有 1000 个信息文件。我正在编写一个查询来获取 1000 条记录,并且在一个循环中,我正在调用一个函数来将该文件下载到本地系统。因此,下载所有 1000 个文件是一个顺序过程。

我希望在下载过程中实现一些并行性。在循环中,我想一次下载 10 个文件,这意味着我想调用下载函数 10 次,完成 10 个文件下载后我想下载接下来的 10 个文件(这意味着我需要调用下载函数 10 次)。

我怎样才能实现这种并行性,或者有没有更好的方法来做到这一点?

我看到Kue npm,但是如何实现呢?顺便说一句,我是从 FTP 下载的,所以我使用 basic-ftp npm 进行 ftp 操作。

【问题讨论】:

  • 在这里放一些代码至少可以下载一个文件

标签: node.js express async.js kue


【解决方案1】:

async 库在这方面非常强大,一旦您了解了基础知识,也非常容易。

我建议您使用eachLimit,这样您的应用就不必担心分十个批次循环,它只会同时下载十个文件。

var files = ['a.txt', 'b.txt']
var concurrency = 10;

async.eachLimit(files, concurrency, downloadFile, onFinish);


function downloadFile(file, callback){

  // run your download code here
  // when file has downloaded, call callback(null)
  // if there is an error, call callback('error code')

}

function onFinish(err, results){

  if(err) {
    // do something with the error
  }

  // reaching this point means the files have all downloaded

}

异步库将并行运行downloadFile,向每个实例发送来自files 列表的条目,然后当列表中的每个项目完成时,它将调用onFinish

【讨论】:

  • 谢谢,@格雷厄姆。这很简单。但是在onFinish 方法中没有从results 返回。我想要来自downloadFile 方法的fileObject。当下载成功时,我正在调用这样的回调callback(null, file)
  • results 应该是一个文件数组,如果函数调用正确的话。如果没有看到你的代码,我很难知道发生了什么。
  • 下面是我的代码。 function downloadFile(file, callback){ console.log(file); callback(null, file); } function onFinish(err, results){ if(err) { console.log('error ', err); } console.log('completed ', results); } results 总是返回未定义。但在downloadFile 文件名中很好。
【解决方案2】:

没有看到你的实现,我只能提供一个通用的答案。

假设您的下载函数接收一个 fileId 并返回一个承诺,该承诺会在所述文件完成下载时解析。对于这个 POC,我将用一个 Promise 来模拟它,该 Promise 将在 200 到 500 毫秒后解析为文件名。

function download(fileindex) {
  return new Promise((resolve,reject)=>{ 
    setTimeout(()=>{ 
      resolve(`file_${fileindex}`);
    },200+300*Math.random());
  });
}

您有 1000 个文件,并希望以 100 次迭代下载它们,每次迭代 10 个文件。

让我们封装一些东西。我将声明一个接收起始 ID 和大小的函数,并返回 [N...N+size] ids

function* range(bucket, size=10) {
    let start = bucket*size, 
        end=start+size;
    for (let i = start; i < end; i++) {
        yield i;
    }
}

您应该创建 100 个“桶”,每个“桶”包含对 10 个文件的引用。

 let buckets = [...range(0,100)].map(bucket=>{
    return [...range(bucket,10)];
 });

此时,buckets的内容为:

 [
   [file0 ... file9]
   ...
   [file 990 ... file 999]
 ]

然后,使用for..of(支持异步)遍历您的存储桶

在每次迭代中,使用 Promise.all 将 10 次调用排队到 download

async function proceed() {
    for await(let bucket of buckets) { // for...of
       await Promise.all(bucket.reduce((accum,fileindex)=>{
           accum.push(download(fileindex)); 
           return accum; 
       },[]));
    }
}

让我们看一个正在运行的示例(只有 10 个存储桶,我们都在这里很忙 :D)

function download(fileindex) {
  return new Promise((resolve, reject) => {
    let file = `file_${fileindex}`;
    setTimeout(() => {
      resolve(file);
    }, 200 + 300 * Math.random());
  });
}

function* range(bucket, size = 10) {
  let start = bucket * size,
    end = start + size;
  for (let i = start; i < end; i++) {
    yield i;
  }
}

let buckets = [...range(0, 10)].map(bucket => {
  return [...range(bucket, 10)];
});

async function proceed() {
  let bucketNumber = 0,
    timeStart = performance.now();
  for await (let bucket of buckets) {
    let startingTime = Number((performance.now() - timeStart) / 1000).toFixed(1).substr(-5),
      result = await Promise.all(bucket.reduce((accum, fileindex) => {
        accum.push(download(fileindex));
        return accum;
      }, []));
    console.log(
      `${startingTime}s downloading bucket ${bucketNumber}`
    );
    await result;
    let endingTime = Number((performance.now() - timeStart) / 1000).toFixed(1).substr(-5);

    console.log(
      `${endingTime}s bucket ${bucketNumber++} complete:`,
      `[${result[0]} ... ${result.pop()}]`
    );
  }
}

document.querySelector('#proceed').addEventListener('click',proceed);
&lt;button id="proceed" &gt;Proceed&lt;/button&gt;

【讨论】:

    猜你喜欢
    • 2013-12-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多