【发布时间】:2021-01-31 21:30:02
【问题描述】:
我正在尝试使用连接到 IBM MQ 的 Apache Camel 在 Java Spring Boot 中完成一个事务性 JMS 客户端。此外,当消息处理失败时,客户端需要应用指数退避重新传递行为。原因:来自 MQ 的消息需要处理并转发到可能停机维护数小时的外部系统。使用事务来保证至少一次处理保证对我来说似乎是合适的解决方案。
我已经研究了这个主题很多小时,但未能找到解决方案。我将从我目前拥有的开始:
@Bean
UserCredentialsConnectionFactoryAdapter uccConnectionFactoryAdapter ()
throws IOException {
MQConnectionFactory factory = new MQConnectionFactory();
factory.setCCDTURL(tabFilePath);
UserCredentialsConnectionFactoryAdapter adapter =
new UserCredentialsConnectionFactoryAdapter();
adapter.setTargetConnectionFactory(factory);
adapter.setUsername(userName);
bentechConnectionFactoryAdapter.setPassword(password);
return adapter;
}
@Bean
PlatformTransactionManager jmsTransactionManager(@Autowired UserCredentialsConnectionFactoryAdapter uccConnectionFactoryAdapter) {
JmsTransactionManager txMgr = new JmsTransactionManager(uccConnectionFactoryAdapter);
return txMgr;
}
@Bean()
CamelContextConfiguration contextConfiguration(@Autowired UserCredentialsConnectionFactoryAdapter uccConnectionFactoryAdapter,
@Qualifier("jmsTransactionManager") @Autowired PlatformTransactionManager txMgr) {
return new CamelContextConfiguration() {
@Override
public void beforeApplicationStart(CamelContext context) {
JmsComponent jmsComponent = JmsComponent.jmsComponentTransacted(uccConnectionFactoryAdapter, txMgr);
// required for consumer-level redelivery after rollback
jmsComponent.setCacheLevelName("CACHE_CONSUMER");
jmsComponent.setTransacted(true);
jmsComponent.getConfiguration().setConcurrentConsumers(1);
context.addComponent("jms", jmsComponent);
}
@Override
public void afterApplicationStart(CamelContext camelContext) {
// Do nothing
}
};
}
// in a route builder
...
from("jms:topic:INPUT_TOPIC?clientId=" + CLIENT_ID + "&subscriptionDurable=true&durableSubscriptionName="+ SUBSCRIPTION_NAME)
.transacted()
.("direct:processMessage");
...
我能够通过集成测试验证事务行为。如果在消息处理期间发生未处理的异常,事务将回滚并重试。问题是,它会立即每秒重试几次,这可能会给 IBM MQ 管理器和外部系统造成很大的负载。
对于 ActiveMQ,重新交付策略很容易做到,网上有很多例子。 ActiveMQConnectionFactory 有一个 setRedeliveryPolicy 方法,意思是 ActiveMQ 客户端库内置了重新传递逻辑。据我所知,这与 Camel 的 Transactional Client EIP 的文档一致,其中指出:
事务模式下的重新交付不是由 Camel 处理,而是由支持系统(事务管理器)处理。在这种情况下,您应该求助于后备系统如何配置重新传递。
我绝对无法弄清楚如何为 IBM MQ 实现相同的目标。 IBM 的MQConnectionFactory 不支持重新交付策略。事实上,在 MQ 知识中心搜索 redeliverypolicy 会准确地找到...击鼓... 0 次点击。我什至浏览了一下 MQConnectionFactory 的实现,也没有发现任何东西。
我研究的另一个支持系统是JmsTransactionManager。搜索“jmstransactionmanager 重新交付策略”或“jmstransactionmanager 指数退避”也没有发现任何有用的信息。有一些关于 TransactionTemplate 和 AbstractMessageListenerContainer 的讨论,但 1) 我没有看到与重新交付政策有任何联系,2) 我无法弄清楚这些政策如何与 Camel 和 JMS 交互。
Sooo,有人知道如何使用 Apache Camel 和 IBM MQ 实施指数退避重新交付策略吗?
结束说明:Camel 支持errorHandler 和onException 上的重新交付政策不与事务/连接支持系统中的重新交付政策相同。这些处理程序使用处于任何状态的“Exchange”对象在故障点重试,而不会从路由开始回滚和重新处理消息。该事务在整个重试期间保持活动状态,并且仅当errorHandler 或onException 放弃时才会发生回滚。对于可能持续数小时的重试,这不是我想要的。
【问题讨论】:
-
这个答案有帮助吗? stackoverflow.com/questions/63292922/…
-
@JoshMc 不确定我将如何在 CamelContext 的上下文中访问 DMLC 并与之交互。然而,那个帖子确实给了我一个想法。我现在正在研究 RouterPolicy 概念和 ThrottlingInflightRoutePolicy 实现。也许我可以使用它来实施我自己的退避政策。
标签: apache-camel jms ibm-mq spring-jms mq