【发布时间】:2019-08-13 06:06:01
【问题描述】:
我有以下doc。
还有人提到:
1.1。多线程 Step 启动并行处理的最简单方法是将 TaskExecutor 添加到 Step 配置中。
使用java配置时,可以在step中添加TaskExecutor 如下例所示:
@Bean
public TaskExecutor taskExecutor(){
return new SimpleAsyncTaskExecutor("spring_batch");
}
@Bean
public Step sampleStep(TaskExecutor taskExecutor) {
return this.stepBuilderFactory.get("sampleStep")
.<String, String>chunk(10)
.reader(itemReader())
.writer(itemWriter())
.taskExecutor(taskExecutor)
.build();
}
上面配置的结果是Step执行的 读取、处理和写入每个项目块(每次提交 间隔)在单独的执行线程中。请注意,这意味着 要处理的项目没有固定的顺序,并且有一个块 可能包含与 单线程案例。除了任务设置的任何限制 executor(比如是否有线程池支持),有一个 tasklet 配置中的节流限制,默认为 4。你 可能需要增加此值以确保线程池完全 使用。
但在我认为应该通过本地分区来实现之前,我应该提供一个分区器来说明如何将数据分成几部分。多线程 Step 应该自动完成。
问题
你能解释一下它是如何工作的吗?除了线程号,我该如何管理它?它适用于平面文件吗?
附言
我创建了示例:
@Configuration
public class MultithreadedStepConfig {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
private ToLowerCasePersonProcessor toLowerCasePersonProcessor;
@Autowired
private DbPersonWriter dbPersonWriter;
@Value("${app.single-file}")
Resource resources;
@Bean
public Job job(Step databaseToDataBaseLowercaseSlaveStep) {
return jobBuilderFactory.get("myMultiThreadedJob")
.incrementer(new RunIdIncrementer())
.flow(csvToDataBaseSlaveStep())
.end()
.build();
}
private Step csvToDataBaseSlaveStep() {
return stepBuilderFactory.get("csvToDatabaseStep")
.<Person, Person>chunk(50)
.reader(csvPersonReaderMulti())
.processor(toLowerCasePersonProcessor)
.writer(dbPersonWriter)
.taskExecutor(jobTaskExecutorMultiThreaded())
.build();
}
@Bean
@StepScope
public FlatFileItemReader csvPersonReaderMulti() {
return new FlatFileItemReaderBuilder()
.name("csvPersonReaderSplitted")
.resource(resources)
.delimited()
.names(new String[]{"firstName", "lastName"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
setTargetType(Person.class);
}})
.saveState(false)
.build();
}
@Bean
public TaskExecutor jobTaskExecutorMultiThreaded() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
// there are 21 sites currently hence we have 21 threads
taskExecutor.setMaxPoolSize(30);
taskExecutor.setCorePoolSize(25);
taskExecutor.setThreadGroupName("multi-");
taskExecutor.setThreadNamePrefix("multi-");
taskExecutor.afterPropertiesSet();
return taskExecutor;
}
}
它确实根据日志有效,但我想知道详细信息。是不是比自己写的分区器好?
【问题讨论】:
标签: java multithreading spring-batch partitioning