【问题标题】:Run callback function after forEach is doneforEach 完成后运行回调函数
【发布时间】:2017-11-10 20:11:39
【问题描述】:

在项目中,我有一个遍历 url 列表的循环。它从每个 url 下载文件,并对下载的文件进行一些后期处理。

在完成所有过程(下载过程和发布过程)之后,我想执行一个回调函数。因为后期处理包括一些流式任务,所以它有关闭事件。如果可以识别最后一项,我可以将回调函数传递给关闭事件。但是,由于循环是异步的,我无法跟踪最后完成了哪个项目。

目前,我使用 5 秒超时来确保回调在整个过程之后执行。显然,这是不可持续的。有什么好的方法来处理这个?

循环代码:

exports.processArray = (items, process, callback) => {
    var todo = items.concat();
    setTimeout(function() {
        process(todo.shift());
        if(todo.length > 0) {
          // execute download and post process each second
          // however it doesn't guarantee one start after previous one done
          setTimeout(arguments.callee, 1000);
        } else {
          setTimeout(() => {callback();}, 5000);
        }
    }, 1000);
};

processArray(
  // First param, the array
  urlList,
  // Second param, download and post process
  (url) => {
    if(url.startsWith('http')) {
      getDataReg(url, uid);
    }
    else if(url.startsWith('ftp')) {
      getDataFtp(url, uid);
    }
    else {
      console.log('not a valid resource');
    }
  },
  // Third param, callback to be executed after all done
  () => {
    Request.get(`${config.demouri}bound=${request.query.boundary};uid=${uid}`, {
      method: 'GET',
      auth: auth
    })
    .on('response', (response) => {
      console.log('response event emmits');
      zipFiles(uid)
      .then((path) => {
        reply.file(path, { confine: false, filename: uid + '.zip', mode: 'inline'}).header('Content-Disposition');
      });
    });
  }
);

下载和发布过程:

exports.getDataFtp = (url, uid) => {
  console.log('get into ftp');
  var usefulUrl = url.split('//')[1];
  var spliter = usefulUrl.indexOf('/');
  var host = usefulUrl.substring(0, spliter);
  var dir = usefulUrl.substring(spliter+1, usefulUrl.length);
  var client = new ftp();
  var connection = {
    host: host
  };
  var fileNameStart = dir.lastIndexOf('/') + 1;
  var fileNameEnd = dir.length;
  var fileName = dir.substring(fileNameStart, fileNameEnd);
  console.log('filename: ', fileName);

  client.on('ready', () => {
    console.log('get into ftp ready');
    client.get(dir, (err, stream) => {
      if (err) {
        console.log('get file err:', err);
        return;
      } else{
        console.log('get into ftp get');
        stream.pipe(fs.createWriteStream(datadir + `download/${uid}/${fileName}`));
        stream.on('end', () => {
          console.log('get into ftp close');
          unzipData(datadir + `download/${uid}/`, fileName, uid);
          client.end();
        });
      }
    });
  });
  client.connect(connection);
};

exports.getDataReg = (url, uid) => {
  console.log('get into http');
    var fileNameStart = url.lastIndexOf('/') + 1;
  var fileNameEnd = url.length;
  var fileName = url.substring(fileNameStart, fileNameEnd);
    var file = fs.createWriteStream(datadir + `download/${uid}/${fileName}`);
    if (url.startsWith('https')) {
    https.get(url, (response) => {
      console.log('start piping file');
      response.pipe(file);
      file.on('finish', () => {
        console.log('get into http finish');
        unzipData(datadir + `download/${uid}/`, fileName, uid);
      });
    }).on('error', (err) => { // Handle errors
      fs.unlink(datadir + `download/${uid}/${fileName}`);
      console.log('download file err: ', err);
    });
    } else {
    http.get(url, (response) => {
      console.log('start piping file');
      response.pipe(file);
      file.on('finish', () => {
        unzipData(datadir + `download/${uid}/`, fileName, uid);
      });
    }).on('error', (err) => {
      fs.unlink(datadir + `download/${uid}/${fileName}`);
      console.log('download file err: ', err);
    });
    }
};

function unzipData(path, fileName, uid) {
  console.log('get into unzip');
  console.log('creating: ', path + fileName);
    fs.createReadStream(path + fileName)
    .pipe(unzip.Extract({path: path}))
    .on('close', () => {
    console.log('get into unzip close');
    var filelist = listFile(path);
    filelist.forEach((filePath) => {
      if (!filePath.endsWith('.zip')) {
        var components = filePath.split('/');
        var component = components[components.length-1];
        mv(filePath, datadir + `processing/${uid}/${component}`, (err) => {
          if(err) {
            console.log('move file err: ');
          } else {
            console.log('move file done');
          }
        });
      }
    });
    fs.unlink(path + fileName, (err) => {});
    });
}

【问题讨论】:

  • 通过“正确”处理异步代码来处理它 - 使用回调或承诺
  • 您可以在processArray() 调用中使用Promise 构造函数。不知道为什么nodejs 没有实现流标准以使用Promise 而不是回调。这个问题在过去一周左右出现了好几次。
  • 你能用代码示例回答这个问题吗?
  • 如果我使用回调,我应该什么时候执行回调? @JaromandaX

标签: javascript asynchronous foreach callback


【解决方案1】:

在完成所有过程(下载过程和发布过程)之后,我想执行一个回调函数。

一系列异步进程的有趣之处在于,您永远无法确切知道所有进程何时会完成。因此,为回调设置超时是一种快速而肮脏的方法,但它肯定不可靠。

您可以改用counter 来解决此问题。 假设您有 10 个操作要执行。一开始您将计数器设置为 10 counter = 10 并且在每个进程完成后,无论如何(它可能成功或失败),您都可以像 counter -= 1 一样将计数器减 1,然后您可以检查是否counter 为 0,如果是,则表示所有流程都已完成,我们到达了终点。你现在可以安全地运行你的回调函数,比如if(counter === 0) callback();


如果我是你,我会这样做:

*注意被调用的进程应该返回一个promise,这样我就可以知道它什么时候完成(不管如何)

*如果您需要有关 Promise 的帮助,这篇有用的文章可能会对您有所帮助:https://howtonode.org/promises

*哦,还有一件事,你应该避免使用arguments.callee,因为它已被弃用。这就是为什么Why was the arguments.callee.caller property deprecated in JavaScript?

exports.processArray = (items, process, callback) => {
    var todo = [].concat(items);
    var counter = todo.length;

    runProcess();

    function runProcess() {
      // Check if the counter already reached 0
      if(checkCounter() === false) {
        // Nope. Counter is still > 0, which means we got work to do.
        var processPromise = process(todo.shift());

        processPromise
          .then(function() {
            // success
          })
          .catch(function() {
            // failure
          })
          .finally(function() {
            // The previous process is done. 
            // Now we can go with the next one.
            --counter;
            runProcess();
          })
      }
    };

    function checkCounter() {
      if(counter === 0) {
        callback();
        return true;
      } else {
        return false;
      }
    }
};

【讨论】:

  • 就我而言,processArray 在一个文件中。所有其他功能都在另一个文件中。从我的角度来看,计数器将在 processArray 中设置。 Counter -= 1 将在 unzipData 中执行。我该如何处理?
  • @zhangjinzhou 其实我不喜欢长答案,也不喜欢长问题,但是我扩展了我的答案来告诉你,你如何解决这个问题。 :-)
  • 我从不使用 promise.finally。你有什么指示吗?
【解决方案2】:

您想要做的是让所有异步进程汇聚成一个 Promise,您可以使用该 Promise 在正确的时刻执行回调。

让我们从每个进程完成的那一刻开始,我假设这是在传递给unzipData() 中的mv() 函数的回调中。您希望将这些异步操作中的每一个包装在一个在回调中解析的 Promise 中,并且您还希望稍后使用这些 Promise,为此您使用 .map() 方法将 Promise 收集到一个数组中(而不是 .forEach()) .
代码如下:

var promises = filelist.map((filePath) => {
  if (!filePath.endsWith('.zip')) {
    var components = filePath.split('/');
    var component = components[components.length-1];
    return new Promise((resolve, reject) =>
      mv(filePath, datadir + `processing/${uid}/${component}`, (err) => {
        if(err) {
          console.log('move file err: ');
          reject(); // Or resolve() if you want to ignore the error and not cause it to prevent the callback from executing later
        } else {
          console.log('move file done');
          resolve();
        }
      }));
  }
  return Promise.resolve();
});

(如果不执行异步操作,则返回立即解析的 Promise)

现在,我们可以将这个 Promise 列表转换为一个 Promise,当列表中的所有 Promise 都已解析时,它会解析:

var allPromise = Promise.all(promises);

接下来,我们需要进一步查看代码。我们可以看到我们刚刚看到的代码本身就是异步操作的事件处理程序的一部分,即fs.createReadStream()。您需要将其包装在当内部承诺解决时得到解决的承诺中,这是unzipData() 函数应返回的承诺:

function unzipData(path, fileName, uid) {
  console.log('get into unzip');
  console.log('creating: ', path + fileName);
  return new Promise((outerResolve) =>
    fs.createReadStream(path + fileName)
    .pipe(unzip.Extract({path: path}))
    .on('close', () => {
      console.log('get into unzip close');
      var filelist = listFile(path);

      // Code from previous examples

      allPromise.then(outerResolve);
    }));
}

接下来,我们看看使用unzipData()的函数:getDataReg()getDataFtp()。它们只执行一个异步操作,因此您需要做的就是让它们返回一个当 unzipData() 返回的承诺解析时解析的承诺。
简化示例:

exports.getDataReg = (url, uid) => {
  return new Promise((resolve, reject) => {

    // ...

    https.get(url, (response) => {
      response.pipe(file);
      file.on('finish', () => {
        unzipData(datadir + `download/${uid}/`, fileName, uid)
          .then(resolve);
      });
    }).on('error', (err) => { // Handle errors
      fs.unlink(datadir + `download/${uid}/${fileName}`);
      reject(); // Or resolve() if you want to ignore the error and not cause it to prevent the callback from executing later
    });

    // ...

  });
}

最后,我们到达processArray() 函数,在这里您需要做与我们开始时相同的事情:将流程映射到一个promise 列表。首先,传递的process函数需要返回getDataReg()getDataFtp()返回的promise:

// Second param, download and post process
(url) => {
  if(url.startsWith('http')) {
    return getDataReg(url, uid);
  }
  else if(url.startsWith('ftp')) {
    return getDataFtp(url, uid);
  }
  else {
    console.log('not a valid resource');
  }
  return Promise.reject(); // or Promise.resolve() if you want invalid resources to be ignored and not prevent the callback from executing later
}

现在,您的 processArray() 函数可能如下所示:

exports.processArray = (items, process, callback) =>
  Promise.all(items.map(process))
    .then(callback)
    .catch(() => console.log('Something went wrong somewhere'));

您的回调将在所有异步操作完成后被调用,无论它们以何种顺序执行。如果任何一个 Promise 被拒绝,回调将永远不会被执行,因此请相应地管理您的 Promise 拒绝。

这是一个带有完整代码的 JSFiddle:https://jsfiddle.net/upn4yqsw/

【讨论】:

  • 完美答案!您清楚地理解并解释了我的问题,并提供了一个很好的解决方案。谢谢!
【解决方案3】:

一般来说,由于nodejs 似乎没有将Streams Standard 实现为基于Promise,至少从可以收集的信息来看;而是使用基于事件或回调机制,您可以在函数调用中使用Promise 构造函数,在调度特定事件时将return 实现为Promise 对象

const doStuff = (...args) => new Promise((resolve, reject)) => {
  /* define and do stream stuff */
  doStreamStuff.on(/* "close", "end" */, => {
    // do stuff
    resolve(/* value */)
  })
});

doStuff(/* args */)
.then(data => {})
.catch(err => {})

【讨论】:

  • 问题是我无法决定何时执行回调。如您所知,这是一个循环。它不告诉最后一项何时完成(我相信这是执行回调的点)。
  • 使用Promise.all()Array.prototype.map()代替Array.prototype.forEach(),见What is the point of promises in JavaScript?
  • 谢谢。我认为 Mikael Lennholm 使用代码来解释它。我想这就是你想要表达的。
猜你喜欢
  • 2019-03-25
  • 2018-10-30
  • 2018-11-05
  • 1970-01-01
  • 1970-01-01
  • 2013-07-07
  • 1970-01-01
  • 1970-01-01
  • 2013-09-29
相关资源
最近更新 更多