【发布时间】:2020-01-15 15:25:47
【问题描述】:
我有两条骆驼路线,用 XML 配置并粘贴在下面:-
路线 1:
<camel:route id="statementsArchivingPollRoute">
<camel:from uri="timer://tempQueue?fixedRate=true&period=30s&delay=30s"/>
<camel:transacted ref="PROPAGATION_REQUIRED">
<camel:process ref="statementsArchivingRequestZipProcessor"/>
<camel:choice>
<camel:when>
<camel:simple>${body.size} >= 1</camel:simple>
<camel:split>
<camel:simple>${body}</camel:simple>
<camel:marshal ref="archiveFileInterfaceMetadataMapper"/>
<camel:to pattern="InOnly"
uri="activemq:{{ccs.activemq.queue.prefix}}.sr.archive.bulk.ingestion.req?jmsMessageType=Text"/>
</camel:split>
<camel:log loggingLevel="INFO" message="Archiving content was processed"/>
</camel:when>
<camel:otherwise>
<camel:log loggingLevel="INFO" message="No archiving content to process"/>
</camel:otherwise>
</camel:choice>
</camel:transacted>
</camel:route>
路线 2:
<camel:route id="statementsArchivingBulkIngestionRequestRoute">
<camel:from uri="activemq:{{ccs.activemq.queue.prefix}}.sr.archive.bulk.ingestion.req"/>
<camel:throttle timePeriodMillis="4000">
<camel:constant>1</camel:constant>
<camel:setExchangePattern pattern="InOnly"/>
<camel:unmarshal ref="archiveFileInterfaceMetadataMapper"/>
<camel:bean ref="archiveFileEntryTransformer" method="transform"/>
<camel:setHeader headerName="CamelHttpMethod">
<camel:constant>POST</camel:constant>
</camel:setHeader>
<camel:toD uri="{{ccs.bulk.ingestion.service.ingest.archive.file}}"/>
</camel:throttle>
</camel:route>
第一个路由中的处理器返回一个请求对象列表。然后拆分该列表,并将每个请求编组并放置在队列中。
第二条路由监听这个队列。当它使消息出队时,它会解组它,执行转换,然后使用它向另一个服务发送发布请求。我正在限制这条路由,使其每秒只处理一条消息,以免压垮下游服务。
当列表只包含几个请求时,这一切都很好,因此只有几条消息进入队列,但是当列表中有很多项目时,路由 2 超时并出现以下日志条目:
Atomikos:12] c.a.icatch.imp.ActiveStateHandler : Timeout/setRollbackOnly of ACTIVE coordinator !
超时会导致该过程自行重复,下游服务最终会被每条消息多次调用,而不是仅调用一次。
我不明白为什么调用路由 2 的次数会导致它超时。我认为将为从 activemq 出队的每条消息启动一个路由实例。如果一条消息需要很长时间才能完成,那么我会理解,但显然超时是基于所有消息出队的累积时间。
我对 Camel 还很陌生,从架构的角度来看,我显然误解了一些东西。对于如何阻止这些超时发生的任何指导,我将非常感激。感谢您的阅读。
【问题讨论】:
-
我不是 Camel 交易方面的专家,但我认为您遇到了问题,因为您进行的交易包括您所做的一切。通常,当您有一个消费者并使其具有事务性时,该交易是消费者收到的每个交换。在这种情况下,您的事务交换是按计时器实例进行的。它需要是事务性的吗?如果你停止它是事务性的,它可以正常工作吗?
-
@Screwtape 这是一个很好的建议,我认为你是对的。我删除了事务标记并再次尝试,但出现以下错误:“AtomikosTransactionRequiredJMSException:您正在使用的 JMS 会话需要调用线程的 JTA 事务上下文,但没有找到。”所以它显然需要这个。所以我把它放回去,但删除了“PROPAGATION_REQUIRED”参考。不幸的是,这给出了与原始帖子完全相同的行为!但我绝对认为这是要寻找的方向。非常感谢 - 我会对此进行更多搜索。
-
嗯...鉴于您的第一条路线在队列生产者上显示“InOnly”,那么我没想到您需要完成第二条路线中的所有内容才能在第一条路线中完成交易。我注意到您的计时器设置为固定速率 - 可能是您有太多记录要写入队列,以至于您在下一个交换开始之前还没有完成第一个交换?这可能是个问题。否则我很难过。
-
@Screwtape 是的,我就是这么想的!不,队列中只有几条记录,所以我也很困惑。但是非常感谢您的关注-您至少给了我研究的方向! :)
-
鉴于 1 在没有 2 的情况下有效,我认为您需要掌握交易策略。请参阅:camel.apache.org/manual/latest/…,这意味着您遇到的问题与 PROPAGATION_REQUIRED 的预期完全一样,因为它将父事务扩展到目标消费者。也许您需要 PROPAGATION_REQUIRES_NEW,或者可能不支持,这并不清楚如何从 Spring XML 中指定。
标签: apache-camel activemq atomikos