【问题标题】:How to process files into chunks and write in multiple files using Spring batch?如何使用Spring批处理将文件处理成块并写入多个文件?
【发布时间】:2022-07-04 17:54:36
【问题描述】:
我有大约 50 个 CSV 文件,其中包含大约 6000 万条数据要处理。但我不希望将所有这些文件合并到单个 CSV 中。而是想合并一小块文件
示例 - 我想处理前三个文件并合并为单个 CSV。然后移动到接下来的三个文件。
目前,我正在使用 Spring 批处理 MultiResourceItemReader 读取所有文件并使用 flatItemWriter 合并到单个文件
【问题讨论】:
标签:
spring
spring-boot
spring-batch
spring-batch-tasklet
【解决方案1】:
是的。您可以创建一个块大小为 3 的面向块的步骤,其中项目的类型为 org.springframework.core.io.Resource。您可以使用ResourcesItemReader 读取文件并使用自定义项目编写器根据需要合并它们。
这是一个简单的例子:
import java.util.Arrays;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.ResourcesItemReader;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
@Configuration
@EnableBatchProcessing
public class SO72493462 {
@Bean
public ItemReader<Resource> itemReader(@Value("#{jobParameters['inputFiles']}") Resource[] resources) {
ResourcesItemReader resourcesItemReader = new ResourcesItemReader();
resourcesItemReader.setResources(resources);
return resourcesItemReader;
}
@Bean
public ItemWriter<Resource> itemWriter() {
return items -> {
// merge resources here (the list will contain at most 3 files at a time, see chunkSize)
};
}
@Bean
public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
return jobs.get("job")
.start(steps.get("step")
.<Resource, Resource>chunk(3)
.reader(itemReader(null))
.writer(itemWriter())
.build())
.build();
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(SO72493462.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
JobParameters jobParameters = new JobParametersBuilder()
.addString("inputFiles", "/data/input*.csv")
.toJobParameters();
jobLauncher.run(job, jobParameters);
}
}