【问题标题】:AMQP/RabbitMQ - Process messages sequentiallyAMQP/RabbitMQ - 按顺序处理消息
【发布时间】:2016-05-30 08:10:08
【问题描述】:

我有一个直接交换。还有一个队列,绑定到这个交换。

该队列有两个消费者。消费者在完成相应的处理后手动确认消息。

消息按逻辑排序/排序,并应按该顺序处理。是否可以强制在消费者 A 和消费者 B 之间按顺序接收和处理所有消息?也就是说,防止A和B同时处理消息。

注意:消费者共享相同的连接和/或通道。这意味着我不能使用<channel>.basicQoS(1);

这个问题的基本原理:两个消费者是相同的。如果其中一个出现故障,另一个队列将开始处理消息,并且一切都会继续工作,无需任何干预。

【问题讨论】:

  • 单身活跃消费者rabbitmq.com/consumers.html#single-active-consumer 怎么样?多个消费者绑定,但所有消息只发送到第一个。如果它死了,则将消息发送到第二个。你得到你的冗余和处理顺序被保留。
  • 单个活动消费者看起来确实比独占绑定更干净,但确实需要 RabbitMQ 3.8

标签: rabbitmq amqp high-availability


【解决方案1】:

在您需要冗余消费者但需要以特定顺序处理消息的情况下,一种处理故障转移的方法是在设置绑定到队列时使用独占消费者选项,并让两个消费者不断尝试即使他们无法获得排他锁,也要绑定。

过程是这样的:

  1. 消费者 A 首先启动并作为独占消费者绑定到队列。消费者 A 开始处理队列中的消息。
  2. 接下来消费者 B 开始并尝试以独占消费者的身份绑定到队列,但由于队列已经有独占消费者而被拒绝。
  3. 消费者 B 反复尝试在队列中获取独占绑定但被拒绝。
  4. 托管消费者 A 的进程崩溃。
  5. 消费者 B 尝试作为独占消费者绑定到队列,并且这次成功。消费者 B 开始处理队列中的消息。
  6. 消费者 A 重新联机,并尝试独占绑定,但现在被拒绝。
  7. 消费者 B 继续按 FIFO 顺序处理消息。

虽然这种方法不提供负载分担,但它确实提供了冗余。

【讨论】:

    【解决方案2】:

    即使这已经被回答了。可能这可以帮助其他人。 ActiveMQ 有一个称为 Single Active Consumer 的功能,它与您的情况相匹配。

    我们可以将 N 个消费者附加到一个队列,但其中只有 1(一个)会主动消费队列中的消息。仅当活动消费者失败时才会发生故障转移。

    请看链接https://www.rabbitmq.com/consumers.html#single-active-consumer

    谢谢

    【讨论】:

      【解决方案3】:

      通常,MQ 系统的重点是分配工作负载。当然,在某些情况下,消息 N 的处理取决于消息 N-1 的处理结果,甚至取决于 N-1 消息本身。

      如果 A 和 B 不能同时处理消息,那么为什么不只拥有 A 或只拥有 B?在我看来,拥有 2 个消费者并没有节省任何东西,只有当另一个消费者不工作时,一个消费者才能工作......

      在您的情况下,最好有一个消费者,但实际上在处理部分进行并行化(实际上不是一个词)。

      只是补充一点,RMQ 将消息均匀地分发给所有消费者(以循环方式),而不管任何标准。当然,这是当 prefetch 设置为 1 时,默认情况下是这样。有关here 的更多信息,请查找“公平调度”。

      【讨论】:

      • 感谢您的见解。回到你的问题then why not just have A or just B:你的理解是正确的,AB 不应该同时处理消息。所以确实是A or B。但是,我认为同时运行AB 会很有用:如果A(或B)崩溃,系统可以继续运行而无需任何(手动)干预。我从你那里了解到,我的方法是不可能的。但接下来的问题是:如何促进从A(或B)到B(或A)的正确故障转移?
      • 不客气。更简单的方法是为A 设置一个监控代理(看门狗),如果它崩溃,它将重新启动它,然后以任何方式从A 故障转移到B。消息不会丢失,它们会留在队列中,并在消费者再次启动时传递。
      • 好的,知道了。 RabbitMQ 是否提供高级故障转移功能?我知道有管理插件和 REST API,但这只是监控(afaik)。是否有任何用于自动故障转移的插件/库/工具(RabbitMQ 或第 3 方),您建议开始使用?
      • 我不知道故障转移功能,这是客户端,所以我认为服务器不应该处理这个问题。可能是有一些插件,但我从来不需要这样的东西,所以从来没有查过。 RabbitMQ 确实有心跳(实际上是 AMQP 功能)检查 - 也许这对你有一些用处。但是看门狗应该很容易在任何操作系统上编写 - 一个例子:每 n 秒拉一次进程列表,如果没有启动它,看看消费者是否在那里。
      • 好的,谢谢。我将首先研究心跳功能。我会把这个问题留几天,以防其他人有更具体的答案。再次感谢。
      猜你喜欢
      • 2019-03-02
      • 2019-05-04
      • 1970-01-01
      • 2020-12-03
      • 1970-01-01
      • 2012-12-26
      • 2014-05-18
      • 1970-01-01
      相关资源
      最近更新 更多