【问题标题】:How to implement async/await in event handler?如何在事件处理程序中实现异步/等待?
【发布时间】:2019-12-18 14:08:53
【问题描述】:

我在事件处理程序中实现 await 时遇到问题。出于某种原因,事件处理程序在开始新进程之前不会等待第一个进程完成。为什么会出现这种奇怪的行为?

const { EventEmitter } = require("events");

let alreadyRunning = false;

const sampleEventHandler = new EventEmitter();
sampleEventHandler.on("data", async (message) => {
  await heavyProcess(message);
});

async function heavyProcess(message) {
  console.log("New message: ", message);
  console.log("already running?: ", alreadyRunning);
  if (alreadyRunning) {
    console.log("Why doesn't it await for first task to complete fully before entering here?");
  }

  const rand = Math.random() * 1000;
  // set var here
  alreadyRunning = true;
  await sleep(rand);
  // unset here
  alreadyRunning = false;
}

function sleep(ms) {
  return new Promise(resolve => setTimeout(resolve, ms));
}

// Emit event 5 times
for (let i = 0; i < 5; i++) {
  sampleEventHandler.emit("data", i);
}

【问题讨论】:

    标签: node.js es6-promise eventemitter


    【解决方案1】:

    您需要promisify heavyProcess 函数才能成功使用await:

    function heavyProcess(message) {
        return new Promise( async (resolve, reject) => {
            console.log("New message: ", message);
            console.log("already running?: ", alreadyRunning);
            if (alreadyRunning) {
                console.log("Why doesn't it await for first task to complete fully before entering here?");
            }
    
            const rand = Math.random() * 1000;
            // set var here
            alreadyRunning = true;
            await sleep(rand);
            // unset here
            alreadyRunning = false;
            resolve('Done');
        });
    }
    

    【讨论】:

      【解决方案2】:

      编辑: 它没有被丢弃的原因是因为你只在做console.log。在它下面添加一个return。

      async function heavyProcess(message) {
        console.log("New message: ", message);
        console.log("already running?: ", alreadyRunning);
        if (alreadyRunning) {
          console.log("Why doesn't it await for first task to complete fully before entering here?");
           return // Actually stop executing
        }
      
        const rand = Math.random() * 1000;
        // set var here
        alreadyRunning = true;
        await sleep(rand);
        // unset here
        alreadyRunning = false;
      }
      

      编辑 2: 在 cmets 之后:OP 所需的行为是,heavyProcess 应该只发生一次,并且只有在进程完成时才开始一个新的。

      现在每个事件都在启动一个新的 HeavyProcess,因为 await 只是等待结果,而不是阻塞。

      您必须自己在事件中引入阻塞(或使用 RXJS 排气映射或类似的东西)。

      let isProcessing = false;
      
      sampleEventHandler.on("data", (message) => {
        if (isProcessing) { return; }
        isProcessing = true;
        const resetProcessing = () => isProcessing = false
        heavyProcess(message).then(resetProcessing, resetProcessing); // Reset of both complete and error
      

      【讨论】:

      • 您是否暗示任务在事件处理程序中并行执行?
      • 是的,每个事件都会执行该函数,从而触发heavyProcess。这也应该反映在您的日志记录中。 (新消息日志将连续弹出 5 次)
      • 我的问题是为什么如果一个新任务在一个已经在进行中的情况下被触发?为什么不等?我编写的代码只是重现我在其他地方面临的行为的示例。
      • 这就是 JavaScript 的异步特性。 await 关键字表示你暂停函数的执行,直到你得到你正在等待的函数的响应。在您的示例中,这发生在每个事件中。你等待一个新的heavyProcess完成。
      • 等不及了。发射循环不会等待。它同步调用每个侦听器,然后丢弃结果。如果您想要一个像内置 Promise.all 一样等待的发出循环,请获取 eventemitter 模块的源代码并编写一个。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-12-22
      相关资源
      最近更新 更多