【问题标题】:How to process files into chunks and write in multiple files using Spring batch?如何使用Spring批处理将文件处理成块并写入多个文件?
【发布时间】:2022-07-04 17:54:36
【问题描述】:

我有大约 50 个 CSV 文件,其中包含大约 6000 万条数据要处理。但我不希望将所有这些文件合并到单个 CSV 中。而是想合并一小块文件

示例 - 我想处理前三个文件并合并为单个 CSV。然后移动到接下来的三个文件。

目前,我正在使用 Spring 批处理 MultiResourceItemReader 读取所有文件并使用 flatItemWriter 合并到单个文件

【问题讨论】:

    标签: spring spring-boot spring-batch spring-batch-tasklet


    【解决方案1】:

    是的。您可以创建一个块大小为 3 的面向块的步骤,其中项目的类型为 org.springframework.core.io.Resource。您可以使用ResourcesItemReader 读取文件并使用自定义项目编写器根据需要合并它们。

    这是一个简单的例子:

    import java.util.Arrays;
    
    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.JobParameters;
    import org.springframework.batch.core.JobParametersBuilder;
    import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.batch.core.launch.JobLauncher;
    import org.springframework.batch.item.ItemProcessor;
    import org.springframework.batch.item.ItemReader;
    import org.springframework.batch.item.ItemWriter;
    import org.springframework.batch.item.file.ResourcesItemReader;
    import org.springframework.batch.item.support.ListItemReader;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.core.io.Resource;
    
    @Configuration
    @EnableBatchProcessing
    public class SO72493462 {
    
        @Bean
        public ItemReader<Resource> itemReader(@Value("#{jobParameters['inputFiles']}") Resource[] resources) {
            ResourcesItemReader resourcesItemReader = new ResourcesItemReader();
            resourcesItemReader.setResources(resources);
            return resourcesItemReader;
        }
    
        @Bean
        public ItemWriter<Resource> itemWriter() {
            return items -> {
                // merge resources here (the list will contain at most 3 files at a time, see chunkSize)
            };
        }
    
        @Bean
        public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
            return jobs.get("job")
                    .start(steps.get("step")
                            .<Resource, Resource>chunk(3)
                            .reader(itemReader(null))
                            .writer(itemWriter())
                            .build())
                    .build();
        }
    
        public static void main(String[] args) throws Exception {
            ApplicationContext context = new AnnotationConfigApplicationContext(SO72493462.class);
            JobLauncher jobLauncher = context.getBean(JobLauncher.class);
            Job job = context.getBean(Job.class);
            JobParameters jobParameters = new JobParametersBuilder()
                    .addString("inputFiles", "/data/input*.csv")
                    .toJobParameters();
            jobLauncher.run(job, jobParameters);
        }
    
    }
    
    

    【讨论】:

      猜你喜欢
      • 2020-06-15
      • 2019-03-16
      • 1970-01-01
      • 1970-01-01
      • 2015-07-12
      • 2013-09-11
      • 2013-11-21
      • 1970-01-01
      • 2015-06-03
      相关资源
      最近更新 更多