【问题标题】:Do not run the IntegrationFlow when spring batch remote chunking worker application starts当spring批处理远程分块工作者应用程序启动时不要运行IntegrationFlow
【发布时间】:2022-01-17 05:59:12
【问题描述】:

我是 Spring 集成和批处理的新手,我想开发一个带有 master 和 worker 的远程分块批处理应用程序。我使用 spring 集成和 RabbitMQ 作为消息队列,应用程序运行良好但 worker itemProccessor 自动启动,但是我需要控制何时启动它。

@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
@EnableIntegration
public class WorkerConfig {


    @Autowired
    private RemoteChunkingWorkerBuilder<Integer, Integer> remoteChunkingWorkerBuilder;

    @Bean
    public DirectChannel requestsChannel() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow inboundFlow(ConnectionFactory connectionFactory) {
        return IntegrationFlows
                .from(Amqp.inboundAdapter(connectionFactory,"requests"))
                .channel(requestsChannel())
                .get();
    }

    @Bean
    public DirectChannel repliesChannel() {
        System.out.println("repliesChannel 3 ");
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow outboundFlow(AmqpTemplate amqpTemplate) {
        return IntegrationFlows
                .from(repliesChannel())
                .handle(Amqp.outboundAdapter(amqpTemplate).routingKey("replies"))
                .get();
    }

    @Bean
    public ItemProcessor<Integer, Integer> itemProcessor() {
       ....
    }

    @Bean
    public ItemWriter<Integer> itemWriter() {
       ...
    }

    @Bean
    public IntegrationFlow workerIntegrationFlow() {
        return this.remoteChunkingWorkerBuilder
                .itemProcessor(itemProcessor())
                .itemWriter(itemWriter())
                .inputChannel(requestsChannel())
                .outputChannel(repliesChannel())
                .build();
    }


}

那么我可以做些什么来手动启动工人部分?

【问题讨论】:

    标签: spring spring-boot rabbitmq spring-batch spring-integration


    【解决方案1】:

    给适配器一个id 并将自动启动设置为false。

    @Bean
    public IntegrationFlow inboundFlow(ConnectionFactory connectionFactory) {
        return IntegrationFlows
                .from(Amqp.inboundAdapter(connectionFactory,"requests")
                    .id("inbound")
                    .autoStartup(false))
                .channel(requestsChannel())
                .get();
    }
    

    然后@Autowire适配器并启动它...

    @Autowired
    AmqpInboundChannelAdapter inbound;
    
    ...
        inbound.start();
    

    【讨论】:

    • 嗨,感谢您的回复,但我认为它仅适用于 inboundPolledAdapter 而不是 inboundAdapter
    • 我认为我错过了一些东西,因为在编译时它给了我:“无法解析方法'from(org.springframework.integration.amqp.dsl.AmqpInboundChannelAdapterSMLCSpec,)'” .任何解决方案请
    • 对不起,我的错误; idautoStartup 直接进入适配器规范 - 请参阅编辑。
    • 谢谢,现在很好,但我收到以下错误:“需要一个找不到的 'org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter' 类型的 bean”...如果你能指出什么是错的,谢谢你非常匹配
    • 这没有意义;集成流程使用 id 作为名称注册 bean。您可能需要将 @DependsOn("inboundFlow") 添加到自动连接适配器的 bean。
    猜你喜欢
    • 2018-04-22
    • 2021-09-17
    • 1970-01-01
    • 1970-01-01
    • 2013-12-17
    • 1970-01-01
    • 2020-10-24
    • 1970-01-01
    • 2012-01-06
    相关资源
    最近更新 更多