【发布时间】:2017-11-13 12:45:35
【问题描述】:
在 Spring Integration 中,我们有一个如下所示的 Setup:
--->
--->
(dispatcher) Messages --> Gateway ----> QueueChannel ---> MessageHandler (worker)
--->
--->
所以我们有一个 Dispatcher 线程,它从 MQTT-Broker 获取消息并将它们转发到队列中。队列的轮询器提供了一个任务执行器,因此消费者是多线程的。 我们设法实现了所有功能。所以刚才描述的设置已经实现。
现在为了保证不会丢失数据,我们要做两件事:
1.: 我们希望我们的队列能够持久化数据,因此当程序异常关闭时,队列中的所有数据仍然存在。 这也适用于我们,我们使用 MongoDB 作为数据库,因为我们在您的文档中读到这是推荐的方法。
2.: 我们要保证的第二件事是工作线程是事务性的。因此,只有当工作线程正确返回时,消息才会从队列中永久删除(因此也会从持久性 MessageStore 中删除)。如果程序在处理消息期间(由工作线程)关闭,则消息在下一次启动时仍将在队列中。 此外,例如,如果工作人员在处理消息期间抛出异常,它将被放回队列中。
我们的实施:
如前所述,程序的基本设置已经实现。然后,我们使用队列的消息存储实现扩展了基本实现。
队列频道:
@Bean
public PollableChannel inputChannel(BasicMessageGroupStore mongoDbChannelMessageStore) {
return new QueueChannel(new MessageGroupQueue(mongoDbChannelMessageStore, "inputChannel"));
}
由 Messagestore 支持:
@Bean
public BasicMessageGroupStore mongoDbChannelMessageStore(MongoDbFactory mongoDbFactory) {
MongoDbChannelMessageStore store = new MongoDbChannelMessageStore(mongoDbFactory);
store.setPriorityEnabled(true);
return store;
}
匹配的轮询器:
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
PollerMetadata poll = Pollers.fixedDelay(10).get();
poll.setTaskExecutor(consumer);
return poll;
}
执行者:
private Executor consumer = Executors.newFixedThreadPool(5);
我们尝试了什么? 正如现在所解释的,我们希望使用事务功能扩展此实现。我们尝试像 here 解释的那样使用 setTransactionSynchronizationFactory,但它不起作用(没有出现错误或任何其他问题,但行为仍然与我们添加 TransactionSynchronizer 之前一样):
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
PollerMetadata poll = Pollers.fixedDelay(10).get();
poll.setTaskExecutor(consumer);
BeanFactory factory = mock(BeanFactory.class);
ExpressionEvaluatingTransactionSynchronizationProcessor etsp = new ExpressionEvaluatingTransactionSynchronizationProcessor();
etsp.setBeanFactory(factory);
etsp.setAfterRollbackChannel(inputChannel());
etsp.setAfterRollbackExpression(new SpelExpressionParser().parseExpression("#bix"));
etsp.setAfterCommitChannel(inputChannel());
etsp.setAfterCommitExpression(new SpelExpressionParser().parseExpression("#bix"));
DefaultTransactionSynchronizationFactory dtsf = new DefaultTransactionSynchronizationFactory(etsp);
poll.setTransactionSynchronizationFactory(dtsf);
return poll;
}
在 Spring 集成中实现我们的要求的最佳方式是什么?
编辑: 按照答案中的建议,我选择使用 JdbcChannelMessageStore 来执行此操作。所以我尝试将here (18.4.2) 描述的 XML 实现转换为 Java。我不太确定该怎么做,这是我迄今为止尝试过的:
我创建了 H2 数据库并在上面运行here 显示的脚本。
已创建 JDBCChannelMessageStore Bean:
@Bean
public JdbcChannelMessageStore store() {
JdbcChannelMessageStore ms = new JdbcChannelMessageStore();
ms.setChannelMessageStoreQueryProvider(queryProvider());
ms.setUsingIdCache(true);
ms.setDataSource(dataSource);
return ms;
}
已创建 H2ChannelMessageStoreQueryProvider
@Bean
public ChannelMessageStoreQueryProvider queryProvider() {
return new H2ChannelMessageStoreQueryProvider();
}
调整轮询器:
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() throws Exception {
PollerMetadata poll = Pollers.fixedDelay(10).get();
poll.setTaskExecutor(consumer);
poll.setAdviceChain(Collections.singletonList(transactionInterceptor()));
return poll;
}
自动装配我的 PlaatformTransactionManager:
@Autowired
PlatformTransactionManager transactionManager;
并从 TransactonManager 创建了 TransactionInterceptor:
@Bean
public TransactionInterceptor transactionInterceptor() {
return new TransactionInterceptorBuilder(true)
.transactionManager(transactionManager)
.isolation(Isolation.READ_COMMITTED)
.propagation(Propagation.REQUIRED)
.build();
}
【问题讨论】:
标签: java spring spring-integration