【问题标题】:Spring Integration: Persistent and transactional QueueChannelSpring 集成:持久和事务性 QueueChannel
【发布时间】:2017-11-13 12:45:35
【问题描述】:

在 Spring Integration 中,我们有一个如下所示的 Setup

                                                     ---> 
                                                     ---> 
(dispatcher) Messages --> Gateway ----> QueueChannel ---> MessageHandler (worker)
                                                     ---> 
                                                     ---> 

所以我们有一个 Dispatcher 线程,它从 MQTT-Broker 获取消息并将它们转发到队列中。队列的轮询器提供了一个任务执行器,因此消费者是多线程的。 我们设法实现了所有功能。所以刚才描述的设置已经实现。

现在为了保证不会丢失数据,我们要做两件事:

1.: 我们希望我们的队列能够持久化数据,因此当程序异常关闭时,队列中的所有数据仍然存在。 这也适用于我们,我们使用 MongoDB 作为数据库,因为我们在您的文档中读到这是推荐的方法。

2.: 我们要保证的第二件事是工作线程是事务性的。因此,只有当工作线程正确返回时,消息才会从队列中永久删除(因此也会从持久性 MessageStore 中删除)。如果程序在处理消息期间(由工作线程)关闭,则消息在下一次启动时仍将在队列中。 此外,例如,如果工作人员在处理消息期间抛出异常,它将被放回队列中。

我们的实施:

如前所述,程序的基本设置已经实现。然后,我们使用队列的消息存储实现扩展了基本实现。

队列频道:

@Bean
public PollableChannel inputChannel(BasicMessageGroupStore mongoDbChannelMessageStore) {
    return new QueueChannel(new MessageGroupQueue(mongoDbChannelMessageStore, "inputChannel"));
}

由 Messagestore 支持:

@Bean
public BasicMessageGroupStore mongoDbChannelMessageStore(MongoDbFactory mongoDbFactory) {
    MongoDbChannelMessageStore store = new MongoDbChannelMessageStore(mongoDbFactory);
    store.setPriorityEnabled(true);
    return store;
}

匹配的轮询器

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
    PollerMetadata poll = Pollers.fixedDelay(10).get(); 
    poll.setTaskExecutor(consumer);
    return poll;
}

执行者

private Executor consumer = Executors.newFixedThreadPool(5);

我们尝试了什么? 正如现在所解释的,我们希望使用事务功能扩展此实现。我们尝试像 here 解释的那样使用 setTransactionSynchronizationFactory,但它不起作用(没有出现错误或任何其他问题,但行为仍然与我们添加 TransactionSynchronizer 之前一样):

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
    PollerMetadata poll = Pollers.fixedDelay(10).get(); 
    poll.setTaskExecutor(consumer);

    BeanFactory factory = mock(BeanFactory.class);
    ExpressionEvaluatingTransactionSynchronizationProcessor etsp = new ExpressionEvaluatingTransactionSynchronizationProcessor();
    etsp.setBeanFactory(factory);
    etsp.setAfterRollbackChannel(inputChannel());
    etsp.setAfterRollbackExpression(new SpelExpressionParser().parseExpression("#bix"));
    etsp.setAfterCommitChannel(inputChannel());
    etsp.setAfterCommitExpression(new SpelExpressionParser().parseExpression("#bix"));
    DefaultTransactionSynchronizationFactory dtsf = new DefaultTransactionSynchronizationFactory(etsp);

    poll.setTransactionSynchronizationFactory(dtsf);

    return poll;
}

在 Spring 集成中实现我们的要求的最佳方式是什么?

编辑: 按照答案中的建议,我选择使用 JdbcChannelMessageStore 来执行此操作。所以我尝试将here (18.4.2) 描述的 XML 实现转换为 Java。我不太确定该怎么做,这是我迄今为止尝试过的:

我创建了 H2 数据库并在上面运行here 显示的脚本。

已创建 JDBCChannelMessageStore Bean:

@Bean
public JdbcChannelMessageStore store() {
    JdbcChannelMessageStore ms =  new JdbcChannelMessageStore();
    ms.setChannelMessageStoreQueryProvider(queryProvider());
    ms.setUsingIdCache(true);
    ms.setDataSource(dataSource);
    return ms;
}

已创建 H2ChannelMessageStoreQueryProvider

    @Bean
    public ChannelMessageStoreQueryProvider queryProvider() {
        return new H2ChannelMessageStoreQueryProvider();
    }

调整轮询器:

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() throws Exception {
    PollerMetadata poll = Pollers.fixedDelay(10).get();
    poll.setTaskExecutor(consumer);
    poll.setAdviceChain(Collections.singletonList(transactionInterceptor()));

    return poll;
}

自动装配我的 PlaatformTransactionManager:

@Autowired
PlatformTransactionManager transactionManager;

并从 TransactonManager 创建了 TransactionInterceptor:

@Bean
public TransactionInterceptor transactionInterceptor() {
    return new TransactionInterceptorBuilder(true)
                .transactionManager(transactionManager)
                .isolation(Isolation.READ_COMMITTED)
                .propagation(Propagation.REQUIRED)
                .build();
}

【问题讨论】:

    标签: java spring spring-integration


    【解决方案1】:

    如果您需要将队列作为事务性,您绝对应该查看事务性MessageStore。只有 JDBC 是这样的。只是因为只有 JDBC 支持事务。所以,当我们执行DELETE时,只有提交TX才OK。

    MongoDB 和任何其他 NoSQL 数据库都支持这种模型,因此您只能在回滚时使用 TransactionSynchronizationFactory 将失败的消息推送回数据库。

    更新

    @RunWith(SpringRunner.class)
    @DirtiesContext
    public class So47264688Tests {
    
        private static final String MESSAGE_GROUP = "transactionalQueueChannel";
    
        private static EmbeddedDatabase dataSource;
    
        @BeforeClass
        public static void init() {
            dataSource = new EmbeddedDatabaseBuilder()
                    .setType(EmbeddedDatabaseType.H2)
                    .addScript("classpath:/org/springframework/integration/jdbc/schema-drop-h2.sql")
                    .addScript("classpath:/org/springframework/integration/jdbc/schema-h2.sql")
                    .build();
        }
    
        @AfterClass
        public static void destroy() {
            dataSource.shutdown();
        }
    
        @Autowired
        private PollableChannel transactionalQueueChannel;
    
        @Autowired
        private JdbcChannelMessageStore jdbcChannelMessageStore;
    
        @Autowired
        private PollingConsumer serviceActivatorEndpoint;
    
        @Autowired
        private CountDownLatch exceptionLatch;
    
        @Test
        public void testTransactionalQueueChannel() throws InterruptedException {
            GenericMessage<String> message = new GenericMessage<>("foo");
            this.transactionalQueueChannel.send(message);
    
            assertTrue(this.exceptionLatch.await(10, TimeUnit.SECONDS));
            this.serviceActivatorEndpoint.stop();
    
            assertEquals(1, this.jdbcChannelMessageStore.messageGroupSize(MESSAGE_GROUP));
            Message<?> messageFromStore = this.jdbcChannelMessageStore.pollMessageFromGroup(MESSAGE_GROUP);
    
            assertNotNull(messageFromStore);
            assertEquals(message, messageFromStore);
        }
    
        @Configuration
        @EnableIntegration
        public static class ContextConfiguration {
    
            @Bean
            public PlatformTransactionManager transactionManager() {
                return new DataSourceTransactionManager(dataSource);
            }
    
            @Bean
            public ChannelMessageStoreQueryProvider queryProvider() {
                return new H2ChannelMessageStoreQueryProvider();
            }
    
            @Bean
            public JdbcChannelMessageStore jdbcChannelMessageStore() {
                JdbcChannelMessageStore jdbcChannelMessageStore = new JdbcChannelMessageStore(dataSource);
                jdbcChannelMessageStore.setChannelMessageStoreQueryProvider(queryProvider());
                return jdbcChannelMessageStore;
            }
    
            @Bean
            public PollableChannel transactionalQueueChannel() {
                return new QueueChannel(new MessageGroupQueue(jdbcChannelMessageStore(), MESSAGE_GROUP));
            }
    
            @Bean
            public TransactionInterceptor transactionInterceptor() {
                return new TransactionInterceptorBuilder()
                        .transactionManager(transactionManager())
                        .isolation(Isolation.READ_COMMITTED)
                        .propagation(Propagation.REQUIRED)
                        .build();
            }
    
            @Bean
            public TaskExecutor threadPoolTaskExecutor() {
                ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
                threadPoolTaskExecutor.setCorePoolSize(5);
                return threadPoolTaskExecutor;
            }
    
            @Bean(name = PollerMetadata.DEFAULT_POLLER)
            public PollerMetadata poller() {
                return Pollers.fixedDelay(10)
                        .advice(transactionInterceptor())
                        .taskExecutor(threadPoolTaskExecutor())
                        .get();
            }
    
            @Bean
            public CountDownLatch exceptionLatch() {
                return new CountDownLatch(2);
            }
    
            @ServiceActivator(inputChannel = "transactionalQueueChannel")
            public void handle(Message<?> message) {
                System.out.println(message);
                try {
                    throw new RuntimeException("Intentional for rollback");
                }
                finally {
                    exceptionLatch().countDown();
                }
            }
    
        }
    
    }
    

    【讨论】:

    • 感谢您的回答。我查看了 JdbcChannelMessageStore。我试图将此处解释的 JdbcChannelMessageStore 实现重写:docs.spring.io/spring-integration/reference/html/… at 18.4.2 到 java。但我没有成功。我编辑了我的原始帖子以显示我想要做的事情。
    • 您对JdbcChannelMessageStore 所做的操作是正确的。现在您需要将PollerMetadataTransactionInterceptor 配置到其setAdviceChain 中。你根本不需要TransactionSynchronizationFactory。 (同时希望你配置那个QueueChannel使用这个JdbcChannelMessageStore
    • 队列的持久性正在工作,但我的交易仍然有问题。我正在尝试抛出新的 RuntimeException();在我的 Worker 中,但事务没有回滚(消息没有放回队列,但丢失了)。为了查看我所做的更改,我再次编辑了我的帖子。 (改编 Poller 并添加 TransactionInterceptor)
    • 在我的回答中查看更新。我完全听从了你的解释,它对我很有效。我看到回滚并且消息没有从 JDBC 存储中丢失。干杯!
    • 您的实现对我有用,但我无法让它在我的用例中运行。为什么叫 this.serviceActivatorEndpoint.stop(); ?
    【解决方案2】:

    感谢 Artem Bilan 的大力支持。我终于找到了解决方案。似乎还有另一个名为 transactionManager 和 transactionInterceptor 的 bean 处于活动状态。这导致了奇怪的行为,即我的跨管理器从未初始化,而是将另一个事务管理器(null)用于事务拦截器和轮询消费者。这就是为什么我在 PollingConsumer 中的 Transactionmanager 为空,以及为什么我的 Transactions 从来没有工作。

    解决方案是重命名我所有的 bean,对于一些 bean,我还使用注解 @Primary 来告诉 spring 在自动装配时始终使用这个特定的 bean。

    我还降级了两个 4.3,只是为了确保这不是与版本 5 相关的错误。我还没有测试它是否可以与 V 5 一起使用,但我认为它也应该可以工作。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-12-01
      • 1970-01-01
      • 2018-04-08
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多