【问题标题】:Bull Queue Concurrency QuestionsBull 队列并发问题
【发布时间】:2019-06-05 12:42:42
【问题描述】:

我需要帮助了解 Bull Queue (bull.js) 如何处理并发作业。

假设我有 10 个 Node.js 实例,每个实例都实例化一个连接到同一个 Redis 实例的 Bull Queue:

const bullQueue = require('bull');
const queue = new bullQueue('taskqueue', {...})
const concurrency = 5;
queue.process('jobTypeA', concurrency, job => {...do something...});

这是否意味着在全球所有 10 个节点实例中最多将有 5 个(并发)并发运行的类型为 jobTypeA 的作业?还是我理解错了,并发设置是per-Node instance?

如果一个 Node 实例指定不同的并发值会怎样?

我可以确定作业不会被多个节点实例处理吗?

【问题讨论】:

  • 这里说的是 BullMQ(看起来像一个抛光的 Bull 重构),并发因子是每个 worker,所以如果 10 的每个实例有 1 个并发因子为 5 的 worker,你应该得到 50 global并发因素,如果一个实例具有不同的配置,它可能只会收到更少的作业/消息,假设它比其他机器更小,至于你的最后一个问题,Stas Korzovsky 的回答似乎很好地涵盖了你的最后一个问题。

标签: javascript node.js concurrency bull.js


【解决方案1】:

TL;DR 是:在正常情况下,作业只被处理一次。如果出现问题(比如 Node.js 进程崩溃),作业可能会被双重处理。

引自公牛官方README.md

重要提示

队列的目标是“至少一次”工作策略。这意味着在某些情况下,可以多次处理作业。这主要发生在工作人员在整个处理过程中未能保持给定作业的锁定时。

当工作人员正在处理一项工作时,它将保持工作“锁定”,这样其他工作人员就无法处理它。

了解锁定如何工作以防止您的作业失去锁定 - 成为停滞 - 并因此重新启动非常重要。锁定是通过在间隔lockRenewTime(通常是lockDuration 的一半)上为lockDuration 创建一个锁定来实现的。如果lockDuration在锁可以更新之前过去了,作业将被视为停顿并自动重新启动;它将被双重处理。这可能发生在以下情况:

  1. 运行作业处理器的 Node 进程意外终止。
  2. 您的作业处理器占用过多 CPU 并停止了 Node 事件循环,因此,Bull 无法更新作业锁(请参阅 #488 了解如何更好地检测到这一点)。您可以通过将作业处理器分成更小的部分来解决此问题,这样任何一个部分都不会阻塞 Node 事件循环。或者,您可以为 lockDuration 设置传递一个更大的值(权衡是识别真正停滞的作业需要更长的时间)。

因此,您应该始终监听stalled 事件并将其记录到您的错误监控系统中,因为这意味着您的作业可能会被双重处理。

作为一种保护措施,有问题的作业不会无限期地重新启动(例如,如果作业处理器使其 Node 进程崩溃),作业将从停滞状态中恢复最多 maxStalledCount 次(默认值:1) .

【讨论】:

    【解决方案2】:

    由于面对a problem with too many processor threads,我花了很多时间研究它。

    简而言之,公牛的并发是在队列对象级别,而不是队列级别。

    如果您深入研究代码,则会在您对队列对象调用 .process 时调用并发设置。这意味着即使在同一个 Node 应用程序中,如果您创建多个队列并多次调用 .process,它们也会增加可处理的并发作业数。

    一位贡献者发布了以下内容:

    是的,当我第一次使用 Bull 时,我也有点惊讶 时间。队列选项永远不会保留在 Redis 中。你可以拥有尽可能多的 根据需要为每个应用程序队列实例,每个可以有不同的 设置。注册时设置并发设置 处理器,它实际上是特定于每个 process() 函数调用的,而不是 队列。如果您使用命名处理器,您可以调用 process() 多个 次。每个调用都会注册 N 个事件循环处理程序(使用 Node 的 process.nextTick()),按并发量(默认为1)。

    所以你的问题的答案是:是的,如果你在多个节点实例中注册进程处理程序,你的进程将被多个节点实例处理。

    【讨论】:

    • 在实现了每次队列注册.process()时并发“堆积”之后,这确实使水平扩展变得更难,弹性扩展更难。当你的主 nodejs 进程分布在 N 个节点/机器上时——也许作为 kubernetes 部署的副本,每个都将运行相同的代码,从而注册 .process() N 次。您必须明确更正此问题,以免在某些副本中调用 .process()。我认为人们期待这里有一个集中的并发管理——毕竟 Bull 已经使用了 Redis;不幸的是,Bull 还没有提供这个。
    【解决方案3】:

    Bull 旨在以“至少一次”语义同时处理作业,尽管如果处理器正常工作,即没有停止或崩溃,它实际上是在“恰好一次”交付。但是,您可以将最大停滞重试次数设置为 0 (maxStalledCount https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md#queue),然后语义将是“最多一次”。

    话虽如此,我会尽量回答发帖者提出的两个问题:

    如果一个 Node 实例指定不同的并发值会怎样?

    我假设您的意思是“队列实例”。如果是这样,则在处理器中指定并发。如果并发性是 X,那么该给定处理器最多会同时处理 X 个作业。

    我可以确定作业不会被多个节点实例处理吗?

    可以,只要您的作业没有崩溃或您的最大停滞作业设置为 0。

    【讨论】:

      【解决方案4】:

      进一步研究,我认为 Bull 根本不处理分布在多个 Node 实例中的问题,因此行为充其量是未定义的。

      【讨论】:

      • 公牛工作分布良好,只要它们在唯一的 redis 上使用相同的主题。每头公牛消耗 redis 队列上的一个作业,并且您的代码定义每个节点最多可以同时处理 5 个,这应该是 50 个(似乎很多)。
      【解决方案5】:

      啊,欢迎!这是一个元答案,可能不是您所希望的,而是解决此问题的一般过程:

      您可以指定并发参数。 Bull 会打电话给您的 处理程序并行处理此最大值。

      我个人并不真正理解这一点或公牛提供的保证。由于不是很清楚:

      IMO 最大的事情是:

      我可以确定作业不会被多个节点处理吗 实例?

      如果独占消息处理是不变的并且会导致您的应用程序不正确,即使有很好的文档,我强烈建议对库进行尽职调查:p

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2015-08-26
        • 1970-01-01
        • 2010-12-06
        • 2021-08-14
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多