【问题标题】:AMQP basic.get concurrent consumers pulling from QueueAMQP basic.get 并发消费者从队列中拉取
【发布时间】:2023-07-30 17:00:01
【问题描述】:

当使用 RabbitMQ 作为消息代理时,我有一个场景,多个并发消费者使用 basic.get AMQP 方法从队列中提取消息,并使用显式确认从队列中删除消息。假设以下设置

Q 有消息 M1、M2、M3 和消费者 C1、C2 和 C3(每个都有自己的连接和通道)连接到它。

  1. 在 basic.get 方法中如何处理并发?对 basic.get 方法的调用是否同步以处理每个使用自己的连接和通道的并发消费者? C1、C2 和 C3 发出 basic.get 调用同时接收消息(假设服务器同时接收所有 3 个请求)。

  2. C1 使用 basic.get 请求消息并获取 M1。当C2请求消息时,由于它使用不同的连接,它是否再次获得M1?

  3. 消费者如何按预定义大小批量提取消息?

【问题讨论】:

  • 不清楚你的问题是什么。
  • 第二点是NO。其他的看不懂! “同步处理并发消费者”是什么意思! RMQ 是一个 FIFO 系统,因此您放置 3 条消息,C1 获得第一个,C2 获得第二个......等等......假设您使用的是 prefetch = 1。
  • 下面我的回答有帮助吗?如果有,请将其标记为答案。如果没有,我该如何详细说明?

标签: rabbitmq message-queue amqp


【解决方案1】:

您的问题确实触及了排队和流程理论的核心,所以我将从这个角度回答(就我的回答而言,RabbitMQ 确实是一个通用的消息代理,因为这适用于任何消息代理)。

在 basic.get 方法中如何处理并发?是打电话给 basic.get 方法同步处理并发消费者每个 使用自己的连接和通道? C1、C2 和 C3 发出 basic.get 同时调用接收消息(假设服务器接收到 同时所有 3 个请求)。

答案 1:RabbitMQ 被设计成一个可靠的消息代理。它包含内部流程和控制,以确保同一消息不会多次传递给不同的消费者。现在,由于测试您描述的场景不切实际,它是否完美运行?谁知道。这就是为什么使用基于消息的架构的正确设计的应用程序将使用幂等事务,这样如果多次处理相同的事务,结果将与处理一次的结果相同。 要点:设计您的应用程序,让这个问题的答案不重要。

C1 使用 basic.get 请求消息并获取 M1。当 C2 请求时 对于一条消息,由于它使用不同的连接,它是否得到 M1 再来一次?

答案 2:不。根据我之前的答案的假设,RabbitMQ 代理将不会在传递相同的消息后返回它。根据通道和队列的设置,消息可能会在传递时自动确认,并且永远不会重新传递。其他设置将在处理线程/通道“死亡”或处理线程的否定确认时自动重新排队。这是一个重要的功能,因为如果可以将“毒药”消息提供给多个消费者,它可能会反复对您的应用程序造成严重破坏。 要点:您在设计应用程序时可以放心地依赖此假设。

消费者如何按预定义大小批量提取消息?

回答:他们不能,也没有意义。在任何排队系统中,基本假设是项目从单个文件中的队列中删除。试图违反这个假设会导致不可预测的行为;此外,单件流通常是最有效的加工方法。然而,在现实世界中,有些情况下批量大小 > 1 是必要的。在这种情况下,将批处理加载到它自己的单个消息中是有意义的,因此这可能需要一个单独的处理线程来从队列中提取消息并将它们批处理在一起,或者最初将它们分批处理。请记住,一旦您有多个消费者,就无法保证按顺序处理单个消息。 要点:应尽可能避免使用批处理,但如果无法避免,您不能假设批处理将包含按任何特定顺序的单个消息。

【讨论】:

    【解决方案2】:

    您可能想阅读RabbitMQ Api guideintroduction to Amqp

    首先,避免在您的消费者中使用basicGet 来消费消息。而是使用消费者接口basicConsume。这允许 RabbitMq 在消息到达队列时向您推送消息。归根结底,这一切都归结为繁忙的轮询。

    当使用basicConsume RabbitMq 时,甚至会在后台向您推送更多消息,直至达到一定的prefetch 计数。这使您可以同时处理多条消息,并最大限度地减少等待下一条消息处理所需的时间(如果某些消息可用)。

    并发根本不是问题,这就是您使用队列的目的! 当一个队列上有多个消费者时,一条消息将始终只传递给一个消费者(只要消息被确认)。否则,您需要为每个消费者设置私有队列并相应地路由您的消息。

    顺便说一句,如果您能够在您的消费者之间共享连接,您应该这样做。 只需确保每个线程使用一个通道即可。

    【讨论】:

    • 有正当理由使用拉取方法与推送方法来检索消息。
    • 当然,使用 pull 方法是有充分理由的!我只是不认为这是典型的用例。
    • 这确实是程序语义的问题,因为应用程序的结果行为是相同的。
    • 使用拉取方法与推送方法的正当理由是什么?每个人都说这是错的,没有人说什么时候是对的。
    • 从架构的角度来看,至少在当前版本的RabbitMQ中,pull方式更容易实现。它也不太复杂,所需的设置更少。当消费者准备好接收消息时,它会收到一条消息。它还与现代制造系统理论相似,后者通过kanban 强调“拉动”。
    【解决方案3】:

    该场景不需要特殊配置。每个客户端都会自动从队列中获取并接收一条消息,就像您希望发生的那样。

    【讨论】:

    • 那么,RabbitMQ 是否处理同步部分?它是否对 basic.get 请求实施了一些锁定?
    • 改写我的问题:basic.get 上的同步是如何处理的?对 basic.get 请求是否有一些锁定?