【问题标题】:What is the difference between Multi-threaded Step and Local Partitioning in spring batch?Spring Batch中的多线程步骤和本地分区有什么区别?
【发布时间】:2019-08-13 06:06:01
【问题描述】:

我有以下doc

还有人提到:

1.1。多线程 Step 启动并行处理的最简单方法是将 TaskExecutor 添加到 Step 配置中。

使用java配置时,可以在step中添加TaskExecutor 如下例所示:

@Bean
public TaskExecutor taskExecutor(){
    return new SimpleAsyncTaskExecutor("spring_batch");
}

@Bean
public Step sampleStep(TaskExecutor taskExecutor) {
        return this.stepBuilderFactory.get("sampleStep")
                                .<String, String>chunk(10)
                                .reader(itemReader())
                                .writer(itemWriter())
                                .taskExecutor(taskExecutor)
                                .build();
}

上面配置的结果是Step执行的 读取、处理和写入每个项目块(每次提交 间隔)在单独的执行线程中。请注意,这意味着 要处理的项目没有固定的顺序,并且有一个块 可能包含与 单线程案例。除了任务设置的任何限制 executor(比如是否有线程池支持),有一个 tasklet 配置中的节流限制,默认为 4。你 可能需要增加此值以确保线程池完全 使用。

但在我认为应该通过本地分区来实现之前,我应该提供一个分区器来说明如何将数据分成几部分。多线程 Step 应该自动完成。

问题

你能解释一下它是如何工作的吗?除了线程号,我该如何管理它?它适用于平面文件吗?

附言

我创建了示例:

@Configuration
public class MultithreadedStepConfig {

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;
    @Autowired
    private ToLowerCasePersonProcessor toLowerCasePersonProcessor;

    @Autowired
    private DbPersonWriter dbPersonWriter;

    @Value("${app.single-file}")
    Resource resources;

    @Bean
    public Job job(Step databaseToDataBaseLowercaseSlaveStep) {
        return jobBuilderFactory.get("myMultiThreadedJob")
                .incrementer(new RunIdIncrementer())
                .flow(csvToDataBaseSlaveStep())
                .end()
                .build();
    }

    private Step csvToDataBaseSlaveStep() {
        return stepBuilderFactory.get("csvToDatabaseStep")
                .<Person, Person>chunk(50)
                .reader(csvPersonReaderMulti())
                .processor(toLowerCasePersonProcessor)
                .writer(dbPersonWriter)
                .taskExecutor(jobTaskExecutorMultiThreaded())
                .build();

    }

    @Bean
    @StepScope
    public FlatFileItemReader csvPersonReaderMulti() {
        return new FlatFileItemReaderBuilder()
                .name("csvPersonReaderSplitted")
                .resource(resources)
                .delimited()
                .names(new String[]{"firstName", "lastName"})
                .fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
                    setTargetType(Person.class);
                }})
                .saveState(false)
                .build();

    }

    @Bean
    public TaskExecutor jobTaskExecutorMultiThreaded() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        // there are 21 sites currently hence we have 21 threads
        taskExecutor.setMaxPoolSize(30);
        taskExecutor.setCorePoolSize(25);
        taskExecutor.setThreadGroupName("multi-");
        taskExecutor.setThreadNamePrefix("multi-");
        taskExecutor.afterPropertiesSet();
        return taskExecutor;
    }
}

它确实根据日志有效,但我想知道详细信息。是不是比自己写的分区器好?

【问题讨论】:

    标签: java multithreading spring-batch partitioning


    【解决方案1】:

    当您使用多线程步骤和分区时,这里基本上存在根本区别。

    多线程步骤是单个进程,因此如果您有处理器/写入器的持久状态,则使用它不是一个好主意。但是,如果您只是生成报告而不保存任何内容,这是一个不错的选择。

    正如您所提到的,您想处理一个平面文件并说您想将记录存储在 DB 中,然后假设您的阅读器不重,您可以使用远程分块概念。

    Partitioner 将为您可以使用逻辑划分的每组数据创建单独的进程。

    希望这会有所帮助。

    【讨论】:

    • 您在谈论远程分区。我说的是本地分区
    猜你喜欢
    • 1970-01-01
    • 2016-04-13
    • 2018-12-01
    • 1970-01-01
    • 2020-01-25
    • 1970-01-01
    • 2015-01-10
    • 2021-02-20
    相关资源
    最近更新 更多