【问题标题】:how to trigger spring batch from spring integration java dsl Integration file poller integrationflowsjava - 如何从spring集成触发spring批处理java dsl集成文件轮询器集成流
【发布时间】:2018-07-01 12:16:04
【问题描述】:

如何使用 java dsl Integrationflows 从 spring 集成中触发 spring 批处理作业。

我有下面的代码轮询目录中的文件,当一个新文件添加到目录中时,会生成一条消息,我想在那个实例上触发一个 spring 批处理作业。请指教。

@Bean
public IntegrationFlow inboundFileIntegration(@Value("${inbound.file.poller.fixed.delay}") long period,
                                              @Value("${inbound.file.poller.max.messages.per.poll}") int maxMessagesPerPoll,
                                              TaskExecutor taskExecutor,
                                              MessageSource<File> fileReadingMessageSource) {

    return IntegrationFlows.from(fileReadingMessageSource,
            c -> c.poller(Pollers.fixedDelay(period)
                    .taskExecutor(taskExecutor)
                    .maxMessagesPerPoll(maxMessagesPerPoll)))
              .transform(Transformers.fileToString())
                     .channel(ApplicationConfiguration.INBOUND_CHANNEL)                 

             .get();
}

【问题讨论】:

    标签: spring file spring-integration spring-batch poller


    【解决方案1】:

    Spring Batch Reference Manual 中有干净的样品关于此事:

    @Bean
    public FileMessageToJobRequest fileMessageToJobRequest() {
        FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
        fileMessageToJobRequest.setFileParameterName("input.file.name");
        fileMessageToJobRequest.setJob(personJob());
        return fileMessageToJobRequest;
    }
    
    @Bean
    public JobLaunchingGateway jobLaunchingGateway() {
        SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
        simpleJobLauncher.setJobRepository(jobRepository);
        simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
        JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);
    
        return jobLaunchingGateway;
    }
    
    @Bean
    public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
        return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/myfiles")).
                        filter(new SimplePatternFileListFilter("*.csv")),
                c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1))).
                handle(fileMessageToJobRequest()).
                handle(jobLaunchingGateway).
                log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
                get();
    }
    

    【讨论】:

    • 感谢您的回复,我收到错误“IntegrationFlowDefinition 类型中的方法句柄(MessageHandlerSpec,? extends MessageHandler>)不适用于参数(FileMessageToJobRequest)”的句柄(fileMessageToJobRequest())。请指教。
    • 好的。试试这个然后handle(fileMessageToJobRequest(), "toRequest")
    • 我尝试了句柄(fileMessageToJobRequest(), "toRequest"),它抛出“IntegrationFlowDefinition 类型中的方法句柄(字符串,字符串)不适用于参数(FileMessageToJobRequest,字符串) "
    • 请问它在扔什么?
    • 它抛出“IntegrationFlowDefinition 类型中的方法句柄(String, String)不适用于参数(FileMessageToJobRequest, String)”
    【解决方案2】:

    以下是我的代码:-

    @Bean
    public IntegrationFlow inboundFileIntegration(@Value("${inbound.file.poller.fixed.delay}") long period,
                                                  @Value("${inbound.file.poller.max.messages.per.poll}") int maxMessagesPerPoll,
                                                  TaskExecutor taskExecutor,
                                                  MessageSource<File> fileReadingMessageSource,
                                                  JobLaunchingGateway jobLaunchingGateway) {
    
        return IntegrationFlows.from(fileReadingMessageSource,
                c -> c.poller(Pollers.fixedDelay(period)
                        .taskExecutor(taskExecutor)
                        .maxMessagesPerPoll(maxMessagesPerPoll)))
                  .handle(fileMessageToJobRequest(),"toRequest")
                           .handle(jobLaunchingGateway)
                            .log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload")
                                 .get();
    }
    @Bean
    public FileMessageToJobRequest fileMessageToJobRequest() {
        FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
        fileMessageToJobRequest.setFileParameterName("input.file.name");
      //  fileMessageToJobRequest.setJob(personJob());
        return fileMessageToJobRequest;
    }
    
    @Bean
    public JobLaunchingGateway jobLaunchingGateway() {
        SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
      //  simpleJobLauncher.setJobRepository(jobRepository);
        simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
        JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);
    
        return jobLaunchingGateway;
    }
    

    【讨论】:

      猜你喜欢
      • 2015-09-14
      • 2018-09-28
      • 1970-01-01
      • 1970-01-01
      • 2015-02-03
      • 2020-07-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多