【问题标题】:RabbitMQ grouping messages as one message ie coalescing messagesRabbitMQ 将消息分组为一条消息,即合并消息
【发布时间】:2013-02-25 15:52:15
【问题描述】:

我试图了解在 RabbitMQ 中合并或分块传入消息的最佳方法(直接使用 Spring AMQP 或 Java 客户端)。

换句话说,我想将 100 条传入消息合并为 1,然后以可靠的方式将其重新发送到另一个队列(正确的是 ACKed 方式)。我相信这在 EIP 中被称为aggregator 模式。

我知道 Spring Integration provides an aggregator solution 但该实现看起来不是故障安全的(也就是说,它看起来必须确认并使用消息来构建合并的消息,因此如果您在执行此操作时将其关闭,您将失去消息?)。

【问题讨论】:

    标签: java rabbitmq amqp spring-integration


    【解决方案1】:

    我不能直接评论 Spring Integration 库,所以我会笼统地说 RabbitMQ。

    如果您不是 100% 相信 Aggregator 的 Spring Integration 实现并且打算自己实现它,那么我建议您避免使用 tx,它在 RabbitMQ 的底层使用事务。

    RabbitMQ 中的事务很慢,如果您正在构建高流量/吞吐量系统,您肯定会遇到性能问题。

    我建议您看一下 Publisher Confirms,它是在 RabbitMQ 中实现的 AMQP 的扩展。这里是新的时候的介绍http://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms/

    您需要调整预取设置以获得正确的性能,请查看http://www.rabbitmq.com/blog/2012/05/11/some-queuing-theory-throughput-latency-and-bandwidth/ 了解一些详细信息。

    以上所有内容都为您提供了一些背景知识来帮助您解决问题。实现相当简单。

    创建消费者时,您需要确保将其设置为需要 ACK。

    1. 出列 n 条消息,出列时需要记下每条消息的 DeliveryTag(用于确认消息)
    2. 将消息聚合成一条新消息
    3. 发布新消息
    4. 确认每个出队的消息

    需要注意的一点是,如果您的消费者在 3 之后和 4 完成之前死亡,那么那些未 ACK 的消息将在恢复时重新处理

    【讨论】:

    • +1 Spring AMQP 已经在等待确认。我需要TX的原因是我需要能够支持事务资源同步的提交/回滚(即在db事务提交或回滚后提交消息)。我将/想要在内存事务中创建自己的事务(即在 db commit 上推送消息列表),以便我可以获得您所说的性能......但这是我必须解决的另一个问题:)跨度>
    【解决方案2】:

    如果您将 <amqp-inbound-channel-adapter/> tx-size 属性设置为 100,容器将每 100 条消息确认一次,因此这应该可以防止消息丢失。

    但是,您可能希望使聚合消息的发送(在第 100 次接收时)具有事务性,这样您就可以在入站消息的 ack 之前确认代理具有该消息。

    【讨论】:

    • 我认为 prefetchCount (org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#setPrefetchCount)(与 tx 大小相关)是我需要使用的。
    • 否;见SimpleMessageListenerContainer.doReceiveAndExecute()BlockingQueueConsumer.commitIfNecessary;如果容器不是事务性的,则仅确认“最后一个”交付标签(在 txSize 消息的循环中)。调整预取,如果它小于 txSize...int actualPrefetchCount = prefetchCount > txSize ? prefetchCount : txSize;.
    • 是的,我想我理解并且我认为我需要编写自己的 SimpleMessageListenerContainer 来传递考虑到事务的预取消息。现在消息容器是针对单个消息设计的(即没有 onMessage(Collection<Message>...) )。基本上看起来我需要自己写org.springframework.amqp.rabbit.listener.BlockingQueueConsumer
    • 我没有提到的一件事是,txSize 是接收 ATTEMPTS 的数量 - 所以,如果 receiveTimeout 很低,那么每次我们在没有找到消息的情况下循环都会导致循环计数。因此,在这种情况下,您可以确认
    猜你喜欢
    • 2020-06-16
    • 2018-04-17
    • 1970-01-01
    • 1970-01-01
    • 2020-02-18
    • 2012-10-08
    • 2021-07-28
    • 1970-01-01
    • 2016-09-24
    相关资源
    最近更新 更多