【问题标题】:How to configure outbound channel adapter with synchronization factory in spring boot如何在 Spring Boot 中使用同步工厂配置出站通道适配器
【发布时间】:2021-01-13 15:02:21
【问题描述】:

Spring Boot 中的以下出站通道适配器配置相当于什么?假设定义了messageChanneltaskExecutorsynchronizationFactory

    <int:outbound-channel-adapter id="outboundChannelAdapter" channel="messageChannel" ref="handler" method="handle">
        <int:poller task-executor="taskExecutor" fixed-delay="500" receive-timeout="500" max-messages-per-poll="10">
            <int:transactional synchronization-factory="synchronizationFactory" isolation="READ_COMMITTED"/>
        </int:poller>
    </int:outbound-channel-adapter>

带有@Poller 注释的@ServiceActivator 似乎没有事务同步工厂的选项。 PollerMetadata 有一个选项,但我不确定如何将该实例连接到 @ServiceActivator

在这种情况下需要同步工厂,因为它是一个基于 DB 的通道,有多个线程从中读取。

【问题讨论】:

    标签: spring spring-boot spring-integration


    【解决方案1】:

    类似这样的:

    @ServiceActivator(inputChannel = "messageChannel", poller = @Poller("myPollerMetadata"))
    public void handle(Message<?> message) { // Or what is your service method signature
        ...
    }
    
    @Bean
    PollerMetadata myPollerMetadata(Executor taskExecutor, TransactionSynchronizationFactory synchronizationFactory) {
    
        PollerMetadata poller = new PollerMetadata();
        poller.setTransactionSynchronizationFactory(synchronizationFactory);
        poller.setMaxMessagesPerPoll(10);
        poller.setReceiveTimeout(500);
        poller.setTaskExecutor(taskExecutor);
        poller.setTrigger(new PeriodicTrigger(500));
        return poller;
    }
    

    您也可以考虑开始学习 Spring Integration Java DSL:https://docs.spring.io/spring-integration/reference/html/dsl.html#java-dsl。相同的配置看起来像这样:

    @Bean
    IntegrationFlow myHandlerFlow(Executor taskExecutor, TransactionSynchronizationFactory synchronizationFactory) {
        return IntegrationFlows.from("messageChannel")
                  .handle(handler, "handle", 
                                c -> c.poller(p -> p
                                         .fixedDelay(500)
                                         .transactionSynchronizationFactory(synchronizationFactory)
                                         .taskExecutor(taskExecutor)
                                         .receiveTimeout(500)
                                         .maxMessagesPerPoll(10)))
                  .get();
    }
    

    【讨论】:

    • 那行得通。感谢您的帮助 Artem。对于有用的提示。这看起来是一种更简单的配置方式。感谢您的时间。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-07-06
    • 1970-01-01
    • 2016-07-19
    • 2016-11-17
    • 1970-01-01
    • 2015-07-11
    相关资源
    最近更新 更多