【问题标题】:RabbitMQ: persistent message with Topic exchangeRabbitMQ:具有主题交换的持久消息
【发布时间】:2023-05-17 20:16:02
【问题描述】:

我是 RabbitMQ 的新手。

我已经建立了一个“主题”交流。消费者可以在发布者之后启动。我希望消费者能够接收在他们启动之前已经发送的消息,而这些消息还没有被消费。

交易所设置有以下参数:

exchange_type => 'topic'
durable => 1
auto_delete => 0
passive => 0

使用此参数发布消息:

delivery_mode => 2

消费者使用 get() 从交换中检索消息。

不幸的是,在任何客户端启动之前发布的任何消息都会丢失。我使用了不同的组合。

我想我的问题是交换不保存消息。也许我需要在发布者和消费者之间建立一个队列。但这似乎不适用于通过键路由消息的“主题”交换。

我应该如何进行?我使用Perl 绑定Net::RabbitMQ(应该没关系)和RabbitMQ 2.2.0

【问题讨论】:

    标签: rabbitmq amqp


    【解决方案1】:

    如果在消息发布时没有连接的消费者可用于处理消息,您需要一个持久队列来存储消息。

    交换器不存储消息,但队列可以。令人困惑的部分是交换可以标记为“持久”,但真正的意思是,如果您重新启动代理,交换本身仍然存在,但它确实不会 表示发送到该交换的任何消息都会自动持久化。

    鉴于此,这里有两个选择:

    1. 在您启动发布商自行创建队列之前,请执行管理步骤。您可以使用 Web UI 或命令行工具来执行此操作。确保将其创建为持久队列,这样即使没有活动的消费者,它也会存储路由到它的所有消息。
    2. 假设您的消费者被编码为总是在启动时声明(并因此自动创建)他们的交换和队列(并且他们声明它们是持久的),只需运行所有消费者至少一次启动任何发布者。这将确保您的所有队列都正确创建。然后,您可以关闭消费者,直到真正需要它们为止,因为队列将永久存储路由到它们的任何未来消息。

    我会选择#1。执行的步骤可能并不多,您始终可以编写所需步骤的脚本,以便可以重复这些步骤。此外,如果您的所有消费者都将从同一个队列中提取(而不是每个都有一个专用队列),那么这确实是最小的管理开销。

    队列是需要适当管理和控制的东西。否则,您最终可能会遇到流氓消费者声明持久队列,使用它们几分钟但再也不会。不久之后,您将拥有一个永久增长的队列,并且没有任何减少它的大小,以及即将到来的经纪人末日。

    【讨论】:

    • 好的,所以解决方案是在发布者脚本中声明固定客户端队列。当然,这需要我提前知道会有多少消费者。
    • 没错,假设每个消费者都需要自己的队列。但是您需要回答的主要问题是,“这些消费者是否需要您在它们出现之前发送的所有历史消息?”。如果他们不关心旧消息,他们可以在启动时声明自己的队列并从那时起接收所有消息,但不会更旧。
    • 应用程序“声明”队列,然后如果它们不存在,MQ 代理会创建它们。尽管对于侦听器应用程序声明队列而不是发送者应用程序是有意义的,但您遇到了您所看到的问题。在运行应用程序之前声明队列、声明交换、创建 vhost 等可能是最好的解决方案。
    【解决方案2】:

    正如 Brian 所提到的,交换器不存储消息,主要负责将消息路由到另一个交换器或队列。如果交换未绑定到队列,则发送到该交换的所有消息都将“丢失”。

    您不需要在发布者脚本中声明固定客户端队列,因为这可能无法扩展。队列可以由您的发布者动态创建,并使用交换到交换绑定在内部进行路由。

    RabbitMQ 支持交换到交换绑定,这将允许拓扑灵活性、解耦和其他好处。您可以在RabbitMQ Exchange to Exchange Bindings [AMPQ] 阅读更多信息

    RabbitMQ Exchange To Exchange Binding

    示例 Python 代码,用于在没有使用队列的消费者存在的情况下创建具有持久性的交换到交换绑定。

    #!/usr/bin/env python
    import pika
    import sys
     
     
    connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
    channel = connection.channel()
     
     
    #Declares the entry exchange to be used by all producers to send messages. Could be external producers as well
    channel.exchange_declare(exchange='data_gateway',
    exchange_type='fanout',
    durable=True,
    auto_delete=False)
     
    #Declares the processing exchange to be used.Routes messages to various queues. For internal use only
    channel.exchange_declare(exchange='data_distributor',
    exchange_type='topic',
    durable=True,
    auto_delete=False)
     
    #Binds the external/producer facing exchange to the internal exchange
    channel.exchange_bind(destination='data_distributor',source='data_gateway')
     
    ##Create Durable Queues binded to the data_distributor exchange
    channel.queue_declare(queue='trade_db',durable=True)
    channel.queue_declare(queue='trade_stream_service',durable=True)
    channel.queue_declare(queue='ticker_db',durable=True)
    channel.queue_declare(queue='ticker_stream_service',durable=True)
    channel.queue_declare(queue='orderbook_db',durable=True)
    channel.queue_declare(queue='orderbook_stream_service',durable=True)
     
    #Bind queues to exchanges and correct routing key. Allows for messages to be saved when no consumer is present
    channel.queue_bind(queue='orderbook_db',exchange='data_distributor',routing_key='*.*.orderbook')
    channel.queue_bind(queue='orderbook_stream_service',exchange='data_distributor',routing_key='*.*.orderbook')
    channel.queue_bind(queue='ticker_db',exchange='data_distributor',routing_key='*.*.ticker')
    channel.queue_bind(queue='ticker_stream_service',exchange='data_distributor',routing_key='*.*.ticker')
    channel.queue_bind(queue='trade_db',exchange='data_distributor',routing_key='*.*.trade')
    channel.queue_bind(queue='trade_stream_service',exchange='data_distributor',routing_key='*.*.trade')
    

    【讨论】:

    • “吃掉所有消息”队列丢失了,据我说消息仍然不会到达“迟到”的订阅者
    • 解释一下?它肯定回答了 OP 问题并且有效。让你的 cmets 更有建设性
    • 这实际上可以工作 @KurtPattyn 和 @flyer 因为你可以随时为 Eat All Messages 创建一个新的消费者,可以从那里“恢复”未处理的消息,并将它们路由到正确的位置跨度>
    • @Kostanos 所说的,只是添加:恢复的消费者不能消费消息(没有自动确认,一旦你看到所有消息就关闭与该队列的连接)。这样您就可以使用 rabbitmq 作为事件存储 - 不确定他们是否有意。
    • 这“有味道”。正如 mbx 所写,这将 rabbitmq 配置为一种事件存储,这不是它应该如何使用的,恕我直言。而是考虑将 Kafka 用于您的用例。 Brian Kelly 的回答完美地解释了这一点。
    【解决方案3】:

    您的情况似乎是“消息持久性”。

    RabbitMQ Tutorials docs 开始,您需要将queuemessages 标记为持久(以下代码为C# 版本。对于其他语言,您可以更喜欢here)。

    1. 首先,在 Publisher 中,您需要确保 queue 能够在 RabbitMQ 节点重新启动后继续存在。为此,我们需要将其声明为耐用的:
    channel.QueueDeclare(queue: "hello",
                         durable: true,
                         ....);
    
    1. 其次,在Consumer中,您需要将消息标记为持久 - 通过将IBasicProperties.SetPersistent 设置为true。
    var properties = channel.CreateBasicProperties();
    properties.Persistent = true;
    

    【讨论】: