【问题标题】:Camel, Akka, JMS and deferred message acknowledgement: can I acknowledge a previously processed message to the broker?Camel、Akka、JMS 和延迟消息确认:我可以向代理确认之前处理的消息吗?
【发布时间】:2016-07-11 20:58:39
【问题描述】:

我正在使用 Akka(最新稳定版本)、akka-camel 和 JMS(就本次对话而言,假设它是 ActiveMQ,但理想情况下,解决方案应该是通用的)。

用例

我有以下用例。在队列 @9​​87654324@ 我收到这样的消息:

time:   1     2    3    4    5    6

      | A1 | B1 | C1 | C2 | A2 | B2 | .... 
         ^                        ^
       first                     latest

我的最终目标是在(A1,A2)(B1,B2) 等中将它们配对;尽管存在重复消息和未传递消息等复杂情况,但复杂之处在于我必须确保在匹配并处理整对消息之前,代理将保留所有未确认的消息。

示例

4,我收到并处理了 4 条消息,并成功处理了配对(C1, C2),但是我仍然无法向代理确认任何内容,因为A1B1 仍然不匹配和待处理,并且在 JMS 确认C2 表示确认到C2 之前的所有消息。事实上,我可以发回的第一个确认是在5 时收到A2:此时我可以确认A1(并且只有A1,因为B1 仍在等待中)。

问题

现在,我似乎不知道如何通过akka-camel 进行这种延迟异步确认。我一直在网上阅读,虽然我可以找到有关如何手动确认消息的解释(docsexample),但没有任何内容显示如何向代理确认先前处理的消息。

import akka.camel.{ CamelMessage, Consumer }
import akka.camel.Ack
import akka.actor.Status.Failure

class Consumer3 extends Consumer {
  override def autoAck = false

  def endpointUri = "jms:queue:test"

  def receive = {
    case msg: CamelMessage =>
      sender() ! Ack
      // on success
      // ..
      val someException = new Exception("e1")
      // on failure
      sender() ! Failure(someException)
  }
}

在这种情况下,Ack 是一个 object,它的语义实际上就是:我确认当前消息,而我需要类似 Message X 现已确认 strong>,其中 X 是一些以前的消息,但不一定是当前消息。

这个用例是否支持或可通过akka-camel 支持,还是我应该自己构建它?

谢谢

【问题讨论】:

    标签: scala jms akka activemq akka-camel


    【解决方案1】:

    听起来你想要的是activemq的ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE

    遗憾的是,INVIDUAL_ACKNOWLEDGE 不是 JMS 规范的一部分,但根据this 的帖子,它在每个队列的基础上被广泛采用。 (我实际上没有检查过)

    由于它不在规范中,akka 不支持开箱即用,但它肯定会做出很好的贡献!

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2012-01-13
      • 1970-01-01
      • 2021-04-18
      • 1970-01-01
      • 2010-12-12
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多