【问题标题】:RabbitMQ - Message order of deliveryRabbitMQ - 消息传递顺序
【发布时间】:2014-02-17 06:07:28
【问题描述】:

我需要为我的新项目选择一个新的队列代理。

这一次我需要一个支持 pub/sub 的可扩展队列,并且保持消息排序是必须的。

我读了亚历克西斯的评论:他写道:

“确实,我们认为 RabbitMQ 提供比 Kafka 更强的排序”

我阅读了 rabbitmq 文档中的消息排序部分:

"可以使用 AMQP 方法将消息返回到队列 重新排队 参数(basic.recover、basic.reject 和 basic.nack),或由于通道 在持有未确认的消息时关闭...使用 2.7.0 及更高版本 个人消费者仍然可以从外部观察消息 如果队列有多个订阅者,则排序。这是由于 其他可能重新排队消息的订阅者。从队列的角度 消息始终按发布顺序保存。”

如果我需要按顺序处理消息,我只能使用rabbitMQ和每个消费者的排他队列吗?

RabbitMQ 是否仍然被认为是有序消息队列的良好解决方案?

【问题讨论】:

    标签: queue rabbitmq message-queue


    【解决方案1】:

    好吧,让我们仔细看看您在上面描述的场景。我认为在问题中的 sn-p 之前粘贴 the documentation 以提供上下文很重要:

    AMQP 0-9-1 核心规范的第 4.7 节解释了 保证排序的条件:发布的消息 一个通道,经过一个交换和一个队列和一个 传出通道将按照与它们相同的顺序被接收 发送。自 2.7.0 版以来,RabbitMQ 提供了更强大的保证。

    可以使用 AMQP 方法将消息返回到队列 重新排队参数(basic.recover、basic.reject 和 basic.nack),或 由于通道关闭而持有未确认的消息。任何 这些场景导致消息在后台重新排队 早于 2.7.0 的 RabbitMQ 版本的队列。从 RabbitMQ 发布 在 2.7.0 中,消息始终按发布顺序保留在队列中,即使存在重新排队或通道关闭。(已添加重点)

    因此,很明显,从 2.7.0 开始,RabbitMQ 在消息排序方面对原始 AMQP 规范进行了相当大的改进。

    对于多个(并行)消费者,无法保证处理顺序。
    第三段(粘贴在问题中)继续给出免责声明,我将解释:“如果队列中有多个处理器,则不再保证消息将按顺序处理。”他们在这里只是说 RabbitMQ 不能违背数学定律。

    考虑一家银行的一系列客户。这家特殊的银行以帮助客户按照进入银行的顺序而自豪。顾客排队等候,由 3 位可用的柜员中的下一位提供服务。

    今天早上,碰巧所有三个柜员都同时有空,接下来的 3 个客户走近了。突然,三个柜员中的第一个病得很重,无法为排队的第一个顾客服务。发生这种情况时,柜员 2 已与客户 2 结束,柜员 3 已开始为客户 3 服务。

    现在,可能会发生以下两种情况之一。 (1)第一个排队的顾客可以回到队头,或者(2)第一个顾客可以抢占第三个顾客,导致该柜员停止为第三个顾客工作,而开始为第一个顾客工作。 RabbitMQ 不支持这种类型的抢占逻辑,我知道的任何其他消息代理也不支持。在任何一种情况下,第一个客户实际上最终都不会首先得到帮助——第二个客户会得到帮助,因为幸运地得到了一个好的、快速的出纳员。保证客户得到帮助的唯一方法是让一名柜员一次帮助一名客户,这将给银行带来重大的客户服务问题。

    我希望这有助于说明您所询问的问题。鉴于您有多个消费者,不可能确保在每种可能的情况下都按顺序处理消息。如果您有多个队列、多个独占消费者、不同的代理等,则无关紧要 - 无法保证 先验 多个消费者按顺序回答消息。但 RabbitMQ 会尽力而为。

    【讨论】:

    • 有没有办法配置rabbit在队列末尾而不是前面重新排队消息?
    • @Ryan:不,你不能。但是有一个解决方法:您可以克隆消息并将其发布到同一个队列中,就像一条全新的消息一样,然后它将进入队列的末尾。在这种情况下,消息的属性redelivered 将是false,而不是像普通重新队列那样的true
    • Kafka 允许通过分区的方式与应用级别定义的部分顺序进行并行化,这对于实际任务非常实用。 RabbitMQ 似乎要么提供没有并行化的全局顺序,要么根本没有顺序。谁的保证更好? )
    • @bazza - 作为一个新问题提出,我会试一试:)
    • @theMayer 这取决于;我看到其他开发人员使用 RabbitMQ 的一些用途只是作为 tcp 套接字替代品,没有任何并行消息传递架构的提示。 RabbitMQ 提供的其他特性(如持久性等)本身就很有吸引力!像这样使用(所以根本不使用消息传递模式,这几乎是一种讽刺)保留消息顺序很有用。
    【解决方案2】:

    消息排序保留在 Kafka 中,但仅在分区内而不是全局内。如果您的数据需要全局排序和分区,这确实会使事情变得困难。但是,如果您只需要确保同一用户的所有相同事件等...最终位于同一分区中以便正确排序,您可以这样做。生产者负责他们写入的分区,因此如果您能够对数据进行逻辑分区,这可能会更好。

    【讨论】:

      【解决方案3】:

      我觉得这个问题有两点不相似,消费顺序和加工顺序。

      消息队列可以 - 在一定程度上 - 保证消息将按顺序被使用,但是,它们不能保证消息的处理顺序。

      这里的主要区别是消息处理的某些方面在消费时无法确定,例如:

      • 如前所述,消费者在处理过程中可能会失败,这里消息的消费顺序是正确的,但是消费者未能正确处理它,这将使它回到队列中,直到现在消费顺序是仍然完好无损,但我们不知道现在的处理顺序如何

      • 1234563 3 的处理时间比预期的要长,那么消息 4 和 5 可能会在消息 3 之前被消耗并完成处理

      因此,即使您设法将消息返回到队列的前面(顺便说一句,这违反了消费顺序),您仍然不能保证下一条消息之前的所有消息都已完成处理。

      如果你想保证处理顺序那么:

      1. 始终只有 1 个消费者实例
      2. 或者不使用消息队列并以同步阻塞方法进行处理,这听起来可能很糟糕,但在许多情况下,业务需求是完全有效的,有时甚至是关键的

      【讨论】:

      • 这是准确的信息,但没有“消费顺序”的实际意义。消息处理是导致系统状态改变的原因。由于消息可以在“消费”之后但表面上在“处理”之前重新排队,所有这些处理都是处理器在完成之前的临时状态 - 希望你不关心。
      • 我还要补充一点,如果您使用选项一并且您正在以被动模式接收来自 RMQ 的事件,并且您使用 NodeJS 中的事件循环,您需要使用预取为 1 的单个通道因为否则你可能会以不同的速度并行处理多条消息。
      • 解决此问题的一种方法是使用带有消息的订单号,并且消费者会跟踪消息及其在 DB 中的顺序。如果消费者 A 尚未完成对 entity-record-id-100 的 message-order-1 的处理,则消费者 B 不应开始处理 entity-record-id-100 的 message-order-2。消费者 B 应该等待并在等待的循环中重试。
      【解决方案4】:

      有适当的方法来保证 RabbitMQ 订阅中的消息顺序。

      如果您使用多个消费者,他们将使用共享的ExecutorService 处理消息。另见ConnectionFactory.setSharedExecutor(...)。你可以设置一个Executors.newSingleThreadExecutor()

      如果您将一个Consumer 与单个队列一起使用,您可以使用多个bindingKeys 绑定此队列(它们可能有通配符)。消息将按照消息代理接收它们的顺序放入队列中。

      例如,您有一个发布者发布重要顺序的消息:

      try (Connection connection2 = factory.newConnection();
              Channel channel2 = connection.createChannel()) {
          // publish messages alternating to two different topics
          for (int i = 0; i < messageCount; i++) {
              final String routingKey = i % 2 == 0 ? routingEven : routingOdd;
              channel2.basicPublish(exchange, routingKey, null, ("Hello" + i).getBytes(UTF_8));
          }
      }
      

      您现在可能希望接收来自 队列 中两个 主题 的消息,它们的发布顺序与它们的发布顺序相同:

      // declare a queue for the consumer
      final String queueName = channel.queueDeclare().getQueue();
      
      // we bind to queue with the two different routingKeys
      final String routingEven = "even";
      final String routingOdd = "odd";
      channel.queueBind(queueName, exchange, routingEven);
      channel.queueBind(queueName, exchange, routingOdd);
      channel.basicConsume(queueName, true, new DefaultConsumer(channel) { ... });
      

      Consumer 现在将按照消息发布的顺序接收消息,而不管您使用不同的主题

      RabbitMQ 文档中有一些不错的 5 分钟教程可能会有所帮助: https://www.rabbitmq.com/tutorials/tutorial-five-java.html

      【讨论】:

        猜你喜欢
        • 2014-11-30
        • 2014-08-11
        • 1970-01-01
        • 2015-10-02
        • 2015-12-13
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多