【问题标题】:SimpleMessageListenerContainer bulk message processingSimpleMessageListenerContainer 批量消息处理
【发布时间】:2016-03-30 22:17:43
【问题描述】:

我有一个传入数据流,它作为单独的消息发送到 RabbitMQ。

我想将这些发送到需要一批消息的服务。当我有一批 1000 条消息或 5 秒过期时,我需要将请求发送到服务。这可以使用 SimpleMessageListenerContainer 吗?

SimpleMessageListenerContainer 支持事务,但是这对 5 秒超时没有帮助。我确实查看了方法 doReceiveAndExecute(BlockingQueueConsumer consumer) 和“receiveTimeout”,但由于这个变量在事务循环中,我最终可能会等待每条消息 5 秒(1000*5 秒= 83 分钟)。

我目前有一个通道感知侦听器,它将消息批处理到一个批量处理器中,该处理器将管理我的超时和队列长度。 SimpleMessageListenerContainer 设置为手动确认。但是,由于侦听器在消息实际发送到服务之前返回,当我在通道关闭时确认消息时偶尔会遇到问题。

我曾考虑编写自己的 ListenerContainer,将整个 BlockingQueueConsumer 发送到 Listener。这是唯一的解决方案还是有人已经设法做类似的事情?

【问题讨论】:

    标签: java spring rabbitmq spring-amqp spring-rabbit


    【解决方案1】:

    您可以使用ChannelAwareMessageListener, 设置acknowledgeMode=MANUAL;在侦听器中累积交付;启动一个计时器(计划任务)以在 +5 秒内执行并保持对通道的引用。当新的交付到达时,取消任务,将新的交付添加到集合中。

    当 1000 个交付到达时(或计划任务触发);调用您的服务;然后使用channel.basicAck()(多个)确认处理后的消息。

    您将需要处理一些竞争条件,但这应该很容易。也许另一个批处理队列最容易,因为其他线程正在等待批处理到达该队列。

    编辑

    从 2.2 开始,SimpleMessageListenerContainer 支持本地传送批量消息 - 请参阅 Batched Messages

    从 2.2 版开始,SimpleMessageListeneContainer 可用于在消费者端(生产者发送离散消息)创建批次。

    设置容器属性consumerBatchEnabled 以启用此功能。 deBatchingEnabled 也必须为真,以便容器负责处理这两种类型的批次。当 consumerBatchEnabled 为 true 时,实现 BatchMessageListenerChannelAwareBatchMessageListener。有关将此功能与@RabbitListener 一起使用的信息,请参阅@RabbitListener with Batching

    【讨论】:

    • 这和我已经拥有的几乎一样。我希望有一些东西可以为我处理复杂性。我已经回到我的代码并修复了我的一些竞争条件。我确实使用了您对批处理的另一个队列的建议。
    • 您可能会发现new idle container events feature that we are working on 很有用。如果idleEventInterval 中没有收到消息,容器线程将发布应用程序事件。它应该避免大多数竞争条件,因为事件是在容器线程上发布的。 Docs here.
    • @Gary Russell,我还需要批量接收消息,以便以批处理模式插入它们。但我的是 Websphere MQ,如果有办法实现这一点,请建议。 stackoverflow.com/questions/55107244/…
    • 从 2.2 开始,SimpleMessageListenerContainer 支持本地发送批量消息 - 请参阅 Batched Messages
    猜你喜欢
    • 2016-03-21
    • 1970-01-01
    • 2019-07-02
    • 2015-06-15
    • 2021-10-09
    • 2019-05-26
    • 1970-01-01
    • 2019-12-15
    • 1970-01-01
    相关资源
    最近更新 更多