【问题标题】:Spring DefaultMessageListenerContainer And ActiveMQSpring DefaultMessageListenerContainer 和 ActiveMQ
【发布时间】:2012-08-16 09:13:58
【问题描述】:

我已将 Spring DefaultMessageListenerContainer 配置为 ActiveMQ 消费者,从队列中消费消息。我们称之为“Test.Queue” 我将此代码部署在 4 台不同的机器上,所有机器都配置为同一个 ActiveMQ 实例,以处理来自同一个“Test.Queue”队列的消息。

一旦所有 4 台机器都启动并运行,我将最大消费者大小设置为 20,我看到队列中的消费者数量为 80(4 * 最大消费者大小 = 80)

当生成并发送到队列的消息增加时,一切都很好。

当有 1000 条消息并且在 80 个消费者中,假设其中一个被卡住了,它会冻结 Active MQ 以停止向其他消费者发送消息。

所有消息都永远卡在 ActiveMQ 中。

由于我有 4 台机器,最多有 80 个消费者,我不知道哪个消费者没有确认。

我停止并重新启动所有 4 台机器,当我停止有坏消费者卡住的机器时,消息再次开始流动。

我不知道如何配置 DefaultMessageListenerContainer 以放弃不良消费者并立即向 ActiveMQ 发出信号以开始发送消息。

即使没有 Spring,我也能够创建如下场景:

  1. 我生成了多达 5000 条消息并将它们发送到“Test.Queue”队列
  2. 我创建了 2 个消费者(消费者 A、B)和一个消费者 B onMessage() 方法,我让线程休眠了很长时间( Thread.sleep(Long.MAX_VALUE)) 的条件类似于当前时间 % 13 为 0 时,然后将线程置于睡眠状态。

  3. 运行这两个消费者。

  4. 去Active MQ,发现队列有2个消费者。
  5. A 和 B 都在处理消息
  6. 在某个时间点,消费者 B 的 onMessage() 被调用,当当前时间 %13 为 0 的条件得到满足时,它使线程进入休眠状态。
  7. 消费者 B 卡住,无法向代理确认
  8. 我回到 Active MQ Web 控制台,仍然看到消费者为 2,但没有消息出列。
  9. 现在我创建了另一个消费者 C 并运行它来消费。
  10. 只有 ActiveMQ 中的消费者数量从 2 上升到 3。
  11. 但是消费者 C 没有消费任何东西,因为代理未能发送任何包含它们的消息,因为它仍在等待消费者 B 确认它。
  12. 我还注意到消费者 A 没有消费任何东西
  13. 我去杀死消费者 B ,现在所有消息都被耗尽了。

假设 A、B、C 由 Spring 的 DefaultMessageListenerContainer 管理,我如何调整 Spring DefaultMessageListenerContainer 以在它未能确认 X 秒后将坏消费者从池中移除(在我的情况下是消费者 B),确认立即代理,这样代理就不会永远持有消息。

感谢您的宝贵时间。

如果我能解决这个问题,不胜感激。

【问题讨论】:

  • 也许您应该更改问题的标题以表明这是一个 activemq 问题
  • 您是否对队列进行了任何特定配置,还是只是默认配置?例如,确保消息的顺序
  • @gkamal 你认为是 ActiveMQ 的问题吗?我们如何解决这个问题,因为当其中一个消费者卡住并且未能确认代理时,它会停止向其他消费者发送消息?是否有解决该问题的方法?
  • @gkamal

标签: spring activemq


【解决方案1】:

这里有几个选项可以尝试...

  1. 将队列预取设置为 0 以促进更好地分布在消费者之间并减少特定消费者的“卡住”消息。见http://activemq.apache.org/what-is-the-prefetch-limit-for.html

  2. 在连接上设置“?useKeepAlive=false&wireFormat.maxInactivityDuration=20000”以在指定的非活动时间后使慢速使用者超时

  3. 设置队列策略“slowConsumerStrategy->abortSlowConsumer”...再次使慢速消费者超时

    <policyEntry ...
      ...
      <slowConsumerStrategy>
          <abortSlowConsumerStrategy />
      </slowConsumerStrategy>
      ...
    </policyEntry> 
    

【讨论】:

  • 即使将预取设置为 1 以更好地分发,消费者也有可能被卡住。有没有办法配置 DefaultMessageListenerContainer 来清理卡住的消费者并确认代理返回?就我而言,消费者在收到消息后必须在幕后做很多事情。我只在消费者需要在 1-2 小时内处理大约 400,000 条消息的日子里遇到这个问题。我不希望消费者控制另一个消费者——从某种意义上说——我不希望其他消费者在代理停止发送消息时什么都不做。
  • 预取设置为 1,这对我有帮助。但是我总是发现 1 条消息卡在代理中,当时我有 3 个消费者产生了 10,000 条消息并发送到代理,其中 1 个消费者故意进入睡眠状态。使用默认值 1000,我注意到有几千条消息卡住了。但它仍然没有解决问题,因为 1 条消息仍然卡住,这可能是一条重要消息。
  • 使用 abortSlowConsumerStrategy 的副作用是什么?假设代理中有 100k 条消息,有 50 个消费者处理它们,其中一个消费者很慢,这会影响慢消费者已经收到的消息吗?
  • 另一个想法是使用 prefetch=0 ......这不应该将消息推送给消费者,而是将它们拉取......对于你的另一个问题,我相信它们会在中止时重新传递给其他消费者(因为它们还没有被确认)
猜你喜欢
  • 2014-07-25
  • 2011-08-13
  • 2011-01-12
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2022-11-07
  • 2019-05-24
相关资源
最近更新 更多