【问题标题】:Consume messages from ActiveMQ one message at a time一次使用一条来自 ActiveMQ 的消息
【发布时间】:2012-10-25 00:46:49
【问题描述】:

我想这样做:

  1. ActiveMQ 向客户端发送消息。

  2. 客户端处理完消息后向ActiveMQ发送确认。

  3. 如果客户端已关闭或未确认消息,则此消息将保留在队列中。不会传递所有其他消息。

  4. 服务器一次发送一条消息,客户端一次处理一条消息。除非第一条消息已被确认,否则不会传递下一条消息。

有没有办法做到这一点?

【问题讨论】:

  • 其实activeMQ是这样工作的(一般来说),你测试过吗?
  • 在我们的例子中,它不是这样工作的。我们一下子收到了所有消息,而不是一一收到。我怀疑服务器配置可能不正确。将进行更多测试并通知您。

标签: activemq


【解决方案1】:

听起来您正在寻找的是prefetch 的下限或零限制与客户确认或个人确认的Ack mode 的组合。零预取将使您的客户端本质上是基于拉取的消费者,如果您的客户端在断开连接时无法确认已发送的消息,则将 ack 设置为客户端将导致代理重新传递消息。

【讨论】:

    【解决方案2】:

    关于失败的缺点,这就是activeMQ的工作原理。

    如果“失败/错误确认”发生在业务逻辑层,那么您应该使用transactions

    警告不是有效的代码(范例)

    // at your Listener
    public void onMessage(Message message)
    {
        if (isValid(message))
        {
            connection.ack();
            commit();
            return;
        }
        rollback();
    }
    

    关于消息限制和持久性,以下目标策略配置将为您提供帮助。

        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" producerFlowControl="true" memoryLimit="10mb">
                  <pendingSubscriberPolicy>
                    <vmCursor />
                  </pendingSubscriberPolicy>
                </policyEntry>
                <policyEntry queue=">" producerFlowControl="true" memoryLimit="10mb">
                  <pendingQueuePolicy>
                    <vmQueueCursor/>
                  </pendingQueuePolicy>
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>
    

    恕我直言,我也会使用 MySQL 来实现这种持久性

        <persistenceAdapter>
            <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#mysql-ds"/>
        </persistenceAdapter>
    

    另外我建议你use Camel 来配置你的throttler 你想要的。

    【讨论】:

    • 很好地总结了我从教程、书籍中学到的东西,但很难将这些部分放在一起。干得好。
    • @Evalon 你提到了&lt;destinationPolicy&gt; 对应的代码。我需要将它保存在哪个 XML 文件中。
    • @Hanumath 默认是activemq.xml&lt;broker&gt; 元素内。
    【解决方案3】:

    尝试以下...

    • 设置队列预取=0
    • 使用 TRANSACTED 确认
    • 单线程消费者(concurrentConsumers=1/maxConcurrentConsumers=1 等)
    • 设置 DispatchAsync = false

    【讨论】:

      猜你喜欢
      • 2013-05-30
      • 2022-01-23
      • 2023-03-14
      • 1970-01-01
      • 2014-07-18
      • 1970-01-01
      • 2016-04-24
      • 2014-11-17
      • 2012-05-30
      相关资源
      最近更新 更多