【发布时间】:2015-02-28 22:44:04
【问题描述】:
我正在使用带有 AsyncItemProcessor 的 Spring Batch,但事情的表现出乎意料。让我先显示代码:
按照Spring Batch project上所示的一个简单示例:
@EnableBatchProcessing
@SpringBootApplication
@Import({HttpClientConfigurer.class, BatchJobConfigurer.class})
public class PerfilEletricoApp {
public static void main(String[] args) throws Exception {// NOSONAR
System.exit(SpringApplication.exit(SpringApplication.run(PerfilEletricoApp.class, args)));
//SpringApplication.run(PerfilEletricoApp.class, args);
}
}
-- 编辑
如果我只是让主进程休眠,请给 slf4j 几秒钟时间来写入刷新日志,一切都会按预期工作。
@EnableBatchProcessing
@SpringBootApplication
@Import({HttpClientConfigurer.class, BatchJobConfigurer.class})
public class PerfilEletricoApp {
public static void main(String[] args) throws Exception {// NOSONAR
//System.exit(SpringApplication.exit(SpringApplication.run(PerfilEletricoApp.class, args)));
ConfigurableApplicationContext context = SpringApplication.run(PerfilEletricoApp.class, args);
Thread.sleep(1000 * 5);
System.exit(SpringApplication.exit(context));
}
}
-- ENDOF 编辑
我正在读取带有字段的文本文件,然后使用 AsyncItemProcessor 进行多线程处理,其中包括 URL 上的 Http GET 以获取一些数据,我还使用 NoOpWriter 在写部分。我将 GET 的结果保存在作业的处理器部分(使用 log.trace / log.warn)。
@Configuration
public class HttpClientConfigurer {
// [... property and configs omitted]
@Bean
public CloseableHttpClient createHttpClient() {
// ... creates and returns a poolable http client etc
}
}
关于工作:
@Configuration
public class BatchJobConfigurer {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Value("${async.tps:10}")
private Integer tps;
@Value("${com.bemobi.perfilelerico.sourcedir:/AppServer/perfil-eletrico/source-dir/}")
private String sourceDir;
@Bean
public ItemReader<String> reader() {
MultiResourceItemReader<String> reader = new MultiResourceItemReader<>();
reader.setResources( new Resource[] { new FileSystemResource(sourceDir)});
reader.setDelegate((ResourceAwareItemReaderItemStream<? extends String>) flatItemReader());
return reader;
}
@Bean
public ItemReader<String> flatItemReader() {
FlatFileItemReader<String> itemReader = new FlatFileItemReader<>();
itemReader.setLineMapper(new DefaultLineMapper<String>() {{
setLineTokenizer(new DelimitedLineTokenizer() {{
setNames(new String[] { "sample-field-001"});
}});
setFieldSetMapper(new SimpleStringFieldSetMapper<>());
}});
return itemReader;
}
@Bean
public ItemProcessor asyncItemProcessor(){
AsyncItemProcessor<String, OiPaggoResponse> asyncItemProcessor = new AsyncItemProcessor<>();
asyncItemProcessor.setDelegate(processor());
asyncItemProcessor.setTaskExecutor(getAsyncExecutor());
return asyncItemProcessor;
}
@Bean
public ItemProcessor<String,OiPaggoResponse> processor(){
return new PerfilEletricoItemProcessor();
}
/**
* Using a NoOpItemWriter<T> so we satisfy spring batch flow but don't use writer for anything else.
* @return a NoOpItemWriter<OiPaggoResponse>
*/
@Bean
public ItemWriter<OiPaggoResponse> writer() {
return new NoOpItemWriter<>();
}
@Bean
protected Step step1() throws Exception {
/*
Problem starts here, If Use the processor() everything ends nicely, but if I insist on the asyncItemProcessor(), the job ends and the logs from processor are not stored on the disk.
*/
return this.steps.get("step1").<String, OiPaggoResponse> chunk(10)
.reader(reader())
.processor(asyncItemProcessor())
.build();
}
@Bean
public Job job() throws Exception {
return this.jobs.get("consulta-perfil-eletrico").start(step1()).build();
}
@Bean(name = "asyncExecutor")
public TaskExecutor getAsyncExecutor()
{
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(tps);
executor.setMaxPoolSize(tps);
executor.setQueueCapacity(tps * 1000);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setThreadNamePrefix("AsyncExecutor-");
return executor;
}
}
-- 使用 AsyncItemWriter 更新(工作版本)
/*Wrapped Writer*/
@Bean
public ItemWriter asyncItemWriter(){
AsyncItemWriter<OiPaggoResponse> asyncItemWriter = new AsyncItemWriter<>();
asyncItemWriter.setDelegate(writer());
return asyncItemWriter;
}
/*AsyncItemWriter defined on the steps*/
@Bean
protected Step step1() throws Exception {
return this.steps.get("step1").<String, OiPaggoResponse> chunk(10)
.reader(reader())
.processor(asyncItemProcessor())
.writer(asyncItemWriter())
.build();
}
-- 关于为什么 AsyncItemProcessor 在向上下文发送 OK-Completed 信号之前不等待所有孩子完成的任何想法?
【问题讨论】:
标签: spring asynchronous spring-boot spring-batch spring-integration