【问题标题】:Asynchronous subscriber in Google PubSub with Async library带有异步库的 Google PubSub 中的异步订阅者
【发布时间】:2020-04-14 19:12:10
【问题描述】:

众所周知,Google PubSub javascript 库是异步的(发布者和订阅者是异步的)。根据我的阅读,我们不能在 JS 代码中使订阅者的执行异步,因为 JS 是单线程的。

  • 谁能解释一下当我订阅事件“消息”时它的实际工作原理?

  • 如果我连续收到 5 条消息,我的 MessageHandler 会阻塞直到它完成吗?

  • 如何在 JS 中异步处理消息?我听说过在异步库中使用队列(一个并行工作人员)。那么,与没有它相比,它是如何工作的呢?

提前感谢您的回答!

【问题讨论】:

    标签: javascript typescript async-await google-cloud-pubsub async.js


    【解决方案1】:

    执行是异步的,但 Javascript 的单线程特性意味着一次只执行一个异步回调。当您订阅事件“消息”时,每条传入消息都会触发事件以使用传入消息运行您的回调。这些将排队并一次运行一个。每个调用都会阻塞,直到它完成。

    使用 Javascript 实现消息并行处理的最佳方法是启动订阅者的多个实例。当有多个订阅者从同一个订阅接收消息时,Cloud Pub/Sub 会对消息进行负载均衡,向每个订阅者发送消息的子集。

    如果您只想运行一个实例,那么您有一些选择,尽管不是最佳选择。首先,如果您想通过事件循环在多个迭代中拆分工作,那么您可以使用setImmediate 告诉引擎在事件循环的下一次迭代中运行提供的回调。例如:

    const doExpensiveWork = message => {
      // Do some more expensive processing here.
      message.ack();
    }
    
    const messageHandler = message => {
      console.log(`Received message: ${message.id}`);
      // Do some work on message here.
      setImmediate(() => doExpensiveWork(message));
    };
    
    subscription.on('message', messageHandler);
    

    这将允许您同时在消息上取得一些进展,尽管执行块仍将连续发生。

    如果您希望在处理器中的不同内核之间进行并行处理并运行单个服务实例,则需要fork subprocesses

    【讨论】:

    • 感谢您的宝贵时间,您的回答很有帮助!是的,我更喜欢使用单个实例,但是具有异步队列和 1 个工作人员 (caolan.github.io/async/v3/docs.html#queue) 的解决方案是另一种异步处理它们的方式吗?
    • 另外一个问题,如果调用是阻塞的,而handler还是收到很多消息,会不会栈崩溃,消息丢失(瓶颈?)?
    • 异步库不会同时在多个内核上运行代码。它仅限于不使用分叉子进程的单线程执行。异步库只是让设置需要运行和可能聚合的异步执行变得更容易。接收大量消息可能会导致任务崩溃,但flow control 允许您限制同时未完成消息的数量,这会有所帮助。无论哪种方式,如果您的任务崩溃,消息将在它重新启动时重新传递。
    猜你喜欢
    • 2013-10-09
    • 1970-01-01
    • 2017-08-31
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-03-10
    • 2019-12-11
    相关资源
    最近更新 更多