【发布时间】:2019-11-24 06:50:51
【问题描述】:
我正在尝试从可能出现一个或多个文件的系统文件夹中轮询文件,对于这些文件,我必须只触发一次批处理作业,而不是次数等于文件夹中的文件数。在我的情况下,我的批处理一次处理多个文件,我只希望轮询器将信号发送到批处理一次以开始其工作。
尝试 poller.maxMessagesPerPoll(1) 等,但它有所不同。我面临的问题是,批处理作业被触发等于轮询文件夹中的文件数。我只想执行一次批处理
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
fileMessageToJobRequest.setJob(fileMessageBatchJob);
return fileMessageToJobRequest;
}
@Bean
public JobLaunchingGateway jobLaunchingGateway() {
SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
simpleJobLauncher.setJobRepository(jobRepository);
simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);
return jobLaunchingGateway;
}
@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
System.out.println("&&&&&&&&&&&&&&&&&&Inside Integration Flow!!!!");
return IntegrationFlows
.from(Files.inboundAdapter(new File("C:\\apps_data\\recv")),
c -> c.poller(Pollers.fixedDelay(1000).maxMessagesPerPoll(1)))
.filter(onlyT4F2())
.handle(fileMessageToJobRequest)
.handle(jobLaunchingGateway)
.log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").get();
}
@Bean
public GenericSelector<File> onlyT4F2() {
System.out.println("@@@@@@@Inside GenericSelector of XXX");
return new GenericSelector<File>() {
@Override
public boolean accept(File source) {
return source.getName().contains("XXX");
}
};
}
当前行为 - 当轮询器在给定位置检测到文件时,配置的批处理作业会触发多次。如果文件为 4,则批处理作业触发 4 次。
预期行为 - 文件轮询后,批处理作业应该只对任意数量的文件执行一次。因为批处理作业一次处理多个文件,所以不需要多次执行。
如果您需要我提供的任何其他信息,请告诉我。请优先提供帮助
【问题讨论】:
-
Spring 批处理作业从轮询位置显式读取所有文件,所以这里我只是使用轮询器来观察文件的特定格式是否即将到来然后触发批处理作业..
标签: spring-boot spring-integration spring-batch spring-integration-dsl poller