【发布时间】:2012-08-16 09:13:58
【问题描述】:
我已将 Spring DefaultMessageListenerContainer 配置为 ActiveMQ 消费者,从队列中消费消息。我们称之为“Test.Queue” 我将此代码部署在 4 台不同的机器上,所有机器都配置为同一个 ActiveMQ 实例,以处理来自同一个“Test.Queue”队列的消息。
一旦所有 4 台机器都启动并运行,我将最大消费者大小设置为 20,我看到队列中的消费者数量为 80(4 * 最大消费者大小 = 80)
当生成并发送到队列的消息增加时,一切都很好。
当有 1000 条消息并且在 80 个消费者中,假设其中一个被卡住了,它会冻结 Active MQ 以停止向其他消费者发送消息。
所有消息都永远卡在 ActiveMQ 中。
由于我有 4 台机器,最多有 80 个消费者,我不知道哪个消费者没有确认。
我停止并重新启动所有 4 台机器,当我停止有坏消费者卡住的机器时,消息再次开始流动。
我不知道如何配置 DefaultMessageListenerContainer 以放弃不良消费者并立即向 ActiveMQ 发出信号以开始发送消息。
即使没有 Spring,我也能够创建如下场景:
- 我生成了多达 5000 条消息并将它们发送到“Test.Queue”队列
我创建了 2 个消费者(消费者 A、B)和一个消费者 B onMessage() 方法,我让线程休眠了很长时间( Thread.sleep(Long.MAX_VALUE)) 的条件类似于当前时间 % 13 为 0 时,然后将线程置于睡眠状态。
运行这两个消费者。
- 去Active MQ,发现队列有2个消费者。
- A 和 B 都在处理消息
- 在某个时间点,消费者 B 的 onMessage() 被调用,当当前时间 %13 为 0 的条件得到满足时,它使线程进入休眠状态。
- 消费者 B 卡住,无法向代理确认
- 我回到 Active MQ Web 控制台,仍然看到消费者为 2,但没有消息出列。
- 现在我创建了另一个消费者 C 并运行它来消费。
- 只有 ActiveMQ 中的消费者数量从 2 上升到 3。
- 但是消费者 C 没有消费任何东西,因为代理未能发送任何包含它们的消息,因为它仍在等待消费者 B 确认它。
- 我还注意到消费者 A 没有消费任何东西
- 我去杀死消费者 B ,现在所有消息都被耗尽了。
假设 A、B、C 由 Spring 的 DefaultMessageListenerContainer 管理,我如何调整 Spring DefaultMessageListenerContainer 以在它未能确认 X 秒后将坏消费者从池中移除(在我的情况下是消费者 B),确认立即代理,这样代理就不会永远持有消息。
感谢您的宝贵时间。
如果我能解决这个问题,不胜感激。
【问题讨论】:
-
也许您应该更改问题的标题以表明这是一个 activemq 问题
-
您是否对队列进行了任何特定配置,还是只是默认配置?例如,确保消息的顺序
-
@gkamal 你认为是 ActiveMQ 的问题吗?我们如何解决这个问题,因为当其中一个消费者卡住并且未能确认代理时,它会停止向其他消费者发送消息?是否有解决该问题的方法?
-
@gkamal