【问题标题】:ActiveMQ: How to send same group of messages to one consumer threadActiveMQ:如何将同一组消息发送到一个消费者线程
【发布时间】:2020-06-13 16:39:10
【问题描述】:

我们在我们的项目中使用 ActiveMQ,我们有一个要求,即一组 属于一个组的消息应该由一个消费者消费 线程。

例如,我们有 3 条 (M1, M2, M3) 属于 A 的消息和 3 条 (M4, M5, M6) 属于 B 的消息。

我们的要求是:属于 A 的所有消息都应该被 消费者线程 1 和属于 B 的所有消息都应该被 消费者线程 2.

根据 ActiveMQ 文档,根据我们的理解,我们使用了 ActiveMQ 中的 JMXGroupId 概念。例如,我们将JMSXGroupId 设置为 123 用于属于 A 的所有消息(M1、M2、M3),234 用于所有 属于 B 人的消息(M4、M5、M6)。

JMXGroupId 在某些情况下属于 A 和 B(M1、M2、M3、M4、M5 和 M6)的所有消息都将发送到同一消费者线程 1。但在我们的要求中,它应该发送到两个不同的线程。

【问题讨论】:

  • 我的回答是否解决了您的问题?如果是这样,请将其标记为正确,以帮助将来可能有相同问题的其他用户。如果不是,请澄清未解决的问题。谢谢!

标签: mule activemq mulesoft


【解决方案1】:

看起来这个主题在 ActiveMQ 用户邮件列表中是 discussed before。该讨论产生了this issue,看来该问题的解决方案涉及发送带有JMSXGroupSeq 属性的消息,其值为-1。如 Jira 所述:

现在重新平衡的关键是组负载是消费者优先级的一部分,它是使用负序号属性定期终止组。 message.setIntProperty("JMSXGroupSeq", -1);

不幸的是,此实现似乎存在问题,在this issue 上进行了总结。该问题在 2010 年末(即 10 年前)被报告,此后似乎没有采取任何措施来解决它。

因此,我为您看到了三种不同的选择:

  1. 自己修复ActiveMQ code-base 中的问题。这是开源软件的一大优势 - 您可以自己修复错误并实现所需的功能,而无需等待其他任何人。
  2. 移至ActiveMQ Artemis,它已经支持此功能,如the documentation 中所述。 ActiveMQ Artemis 是 ActiveMQ 的下一代代理,当它达到足够的功能奇偶性时,它将成为 ActiveMQ 6。 ActiveMQ Artemis 支持与 ActiveMQ 5.x 相同的所有协议,因此您无需更改任何客户端。
  3. 放弃消息分组,只需在您的消费者上使用选择器。例如,属于个人 1 的每条消息都可以有一个名为 personIndex 的属性,其值为 1,并且应该接收这些消息的消费者可以使用像 personIndex = 1 这样的选择器,然后属于个人 2 的每条消息都可以有一个名为personIndex 的属性值为2,并且应该接收这些消息的消费者可以使用personIndex = 2 之类的选择器。

【讨论】:

  • 感谢您的回复。我们曾考虑过较早实施选项 3,但我们面临的挑战是我们可能会收到各种与人相关的消息,因此,如果我需要设置索引,我可能不得不使用一些巨大的数字,而且我需要创建那么多的入站每个精细的连接器使用一个选择器过滤器。
  • 我需要在 mule 中创建类似的东西
  • 如果您看到一些入站端点会我的代码呈指数级增长。
  • 我猜这排除了选项 3,因此将选项 1 和 2 作为潜在的解决方案。
猜你喜欢
  • 2013-09-08
  • 1970-01-01
  • 2014-10-23
  • 1970-01-01
  • 2020-12-09
  • 2013-08-01
  • 2021-11-25
  • 1970-01-01
  • 2016-06-08
相关资源
最近更新 更多