【问题标题】:Camel transacted JMS component doesn't work with AggregatorCamel 事务处理的 JMS 组件不适用于聚合器
【发布时间】:2021-06-09 10:20:13
【问题描述】:

我发现带有选项 transacted=true 的 JMS 组件不适用于 Aggregator。我的路由在到达聚合器之前是事务性的,在聚合器之后我的路由不再是事务性的。如果我理解得很好,这是因为聚合器正在为每个交换转换启动新线程,并且这种行为会停止第一个事务线程,从而导致事务提交和确认发送到队列。所以消息不再在队列中。这是正确的吗?

我需要对 Camel 做的是从队列 (RabbitMQ) 中读取所有消息,然后将它们全部转换到一个列表中,最后将数据从列表写入文件。所有这些都必须是事务性的,如果路由中发生错误,消息应该保留在队列中。 基本上我到目前为止所做的是从队列中读取 jms 组件(使用选项 transacted=true),然后使用聚合器聚合所有消息,然后发送到文件。

我的代码的简短示例:

from("jms:queue:someQUeue?transacted=true")
    // some more processing in the route                          <- route is transactional here
    .aggregate((constant(true)), new MyAggregationStrategy())
    .completionInterval(500)
    .process(new Processor() {                                    <- route is not transactional anymore
               // some processing
    })
    .toD("file://C/tmp...")

我认为完美的组件是 Simple JMS-batch,但我们使用的是 Camel 3.8,并且此组件在此版本和更新版本中不再存在.为什么?有什么新东西代替这个吗?

另外 SJMSSJMS2 对我没有帮助,当我使用它们时,消费者确实会读取所有消息(队列中的所有消息都未确认 - 等待确认),但是通过路由的处理还是一条一条的消息。我怎样才能使它成为与所有消息列表的一次交换?

是否有其他解决方案? (例如,制作一些自定义聚合器 - 但如何?)

【问题讨论】:

    标签: apache-camel camel-jms


    【解决方案1】:

    据我所知,这是正确的!如果您的意图是实现保证交付,在这种情况下,您最好使用persistent aggregation repository

    【讨论】:

    • 感谢您的回答法比奥。但我认为这并不能解决所描述的问题。使用持久聚合存储库,我在聚合器之后发生的处理中是事务安全的,如果该处理出现故障,我的聚合交换将仅返回到聚合器存储库。但我希望在聚合器处理器之前将该​​交换返回到 jms 队列,这是我的起点。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-05-11
    • 2019-02-26
    • 2013-02-09
    • 1970-01-01
    相关资源
    最近更新 更多