【问题标题】:Using ActiveMQ with more than one consumer instance将 ActiveMQ 与多个消费者实例一起使用
【发布时间】:2018-10-14 23:17:13
【问题描述】:

在我们的架构中,每个应用程序都有 2 个或更多容器 高可用性目的。 我们正在使用 activeMQ,我想实现以下行为。

  1. 将消息推送到队列。
  2. 此消息将仅由 *one *container(第一个 会读的)。
  3. 一旦消息处理成功,消费者将更新此消息 消息可以被确认和忽略
  4. 我尝试使用事务提交并使用 CLIENT_ACKNOWLEDGE 在这两种情况下,两个消费者都收到了消息并进行了处理。

我希望只有一个消费者根据可用性处理每条消息。 我们的实现是用 Java 实现的。

请分享实现方式。

这是我的代码示例

 final Connection consumerConnection = connectionFactory.createConnection();
    consumerConnection.start();
    // Create a session.
    final Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

    final Destination consumerDestination = consumerSession.createQueue(queueName);


    // Create a message consumer from the session to the queue.
    final MessageConsumer consumer = consumerSession.createConsumer(consumerDestination);
    // Begin to wait for messages.
    Queue queue = consumerSession.createQueue(queueName);
    QueueBrowser queueBrowser = consumerSession.createBrowser(queue);
    Enumeration msgs = queueBrowser.getEnumeration();
    while (msgs.hasMoreElements()) {
        //do your things here

        ActiveMQTextMessage message = (ActiveMQTextMessage) msgs.nextElement();

        if (message == null)
            continue;

        //handle message
        System.out.println("Message received in : " + message);
        try {
            String text = message.getText();
            JSONObject messageJson = new JSONObject(text);
            consumer.receive(1000);

            String responseString = handleMessage(messageJson);

            message.acknowledge();

谢谢 摩西

【问题讨论】:

  • 假设您实际上使用的是队列(而不是主题),并且还假设您正在适当地确认消息(并且没有将其取消回队列以便其他消费者可以使用)然后一切应该按预期工作。你能证实这两个假设吗?您可以在创建会话和确认消息的位置分享您的客户端代码吗?
  • 谢谢@JustinBertram 我用我的代码更新了问题

标签: java activemq


【解决方案1】:

这里的问题是您正在确认来自QueueBrowser 实例的消息,这不会产生任何影响,因为浏览器仅用于浏览消息而不是使用它们。

...
QueueBrowser queueBrowser = consumerSession.createBrowser(queue);
Enumeration msgs = queueBrowser.getEnumeration();
while (msgs.hasMoreElements()) {
    ...
    ActiveMQTextMessage message = (ActiveMQTextMessage) msgs.nextElement();
    ...
    try {
        ...
        message.acknowledge(); // this does nothing

正在实际上创建了一个真正的消费和调用consumer.receive(1000),但是您正在丢弃receive() 返回的Message 实例。这是Message 实例,您必须确认它才能实际使用队列中的消息。

...
// Create a message consumer from the session to the queue.
final MessageConsumer consumer = consumerSession.createConsumer(consumerDestination);
...
while (msgs.hasMoreElements()) {
    ...
    try {
        ...
        Message actualMessage = consumer.receive(1000); // acknowledge this!
        ...

另一个重要提示...队列浏览器接收到的消息不能保证是队列的静态快照。因此,假设您对consumer.receive(1000) 的调用实际上是来自队列浏览器的相同 消息,这是很危险的。在我看来,您应该重新设计此逻辑以仅使用 MessageConsumer 并删除 QueueBrowser

【讨论】:

  • 谢谢你,但是当我正在使用这种方法时,消费者.recieve 消息不再在队列中。这意味着如果我未能将其发送到下一步,它将丢失。我如何向这个场景添加提交级别,所以只有在将其发送到下一步后,它才会从队列中删除。
  • 由于您使用的是 CLIENT_ACKNOWLEDGE 模式,因此当 consumer.receive(1000) 返回时,应删除消息,如 JMS 规范所示。您必须明确确认它返回的消息或稍后在同一会话中返回的消息。如果消息正在被确认,那么这是 ActiveMQ 中的一个错误。但是,我怀疑是否存在错误,因为该项目的自动化单元测试会发现它。我鼓励您对此进行彻底调查,以确保按照您的建议发生这种情况。出于好奇,您使用的是什么版本的 ActiveMQ?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2012-01-10
  • 2012-04-18
  • 2016-06-08
  • 2020-06-17
  • 1970-01-01
  • 2022-01-23
  • 1970-01-01
相关资源
最近更新 更多