【问题标题】:Spring batch DB to JSON files [duplicate]Spring批处理数据库到JSON文件[重复]
【发布时间】:2020-11-12 23:14:26
【问题描述】:

这个问题似乎与this 重复,但不是

我的要求是使用 JdbcPagingItemReader 从 db 读取数据并处理单独的记录以进行一些额外的处理,并在编写器中为每个已处理的项目创建单独的 json 文件,文件名为 id_of_record_json_fie.txt强>

例如,如果阅读器读取 100 条记录,则必须创建 100 个 JSON 文件

最好的方法是什么,我们可以使用弹簧批处理吗?

更新 1-:

根据@Mahmoud 的回答,可以使用tasklet,我也尝试在面向块的步骤中实现自定义itemwriter,这似乎也有效

      @Override
        public void write(final List<? extends Person> persons) throws Exception {
            
            for (Person  person: persons) {
                objectMapper.writeValue(new File("D:/cp/dataTwo.json"), person);
            }
            
        }

【问题讨论】:

  • 是的,你可以。 SQL Reader 然后是一个创建 JSON 的处理器和一个写入 json 文件的 writer
  • @SimonMartinelli:我需要一些关于如何生成具有不同名称的单个 JSON 文件的帮助/指导,我知道 Spring 批处理编写器将整个块写入单个文件

标签: java spring-boot spring-batch


【解决方案1】:

使用面向块的 tasklet 是行不通的,因为将有一个单独的项目编写器,资源在其上预先设置,并且将在整个步骤中修复。使用复合项目编写器可能会奏效,但您需要预先知道要创建和配置多少不同的编写器。

我看到的最直接的选择是使用 tasklet,例如:

import java.util.Collections;
import java.util.HashMap;

import javax.sql.DataSource;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.builder.FlatFileItemWriterBuilder;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;

@Configuration
@EnableBatchProcessing
public class MyJob {

    @Bean
    public JdbcPagingItemReader<Person> itemReader() {
        return new JdbcPagingItemReaderBuilder<Person>()
                .name("personItemReader")
                .dataSource(dataSource())
                .beanRowMapper(Person.class)
                .selectClause("select *")
                .fromClause("from person")
                .sortKeys(new HashMap<String, Order>() {{ put("id", Order.DESCENDING);}})
                .build();
    }

    @Bean
    public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
        return jobs.get("job")
                .start(steps.get("step")
                        .tasklet(new MyTasklet(itemReader()))
                        .build())
                .build();
    }
    
    private static class MyTasklet implements Tasklet {

        private boolean readerInitialized;
        private JdbcPagingItemReader<Person> itemReader;

        public MyTasklet(JdbcPagingItemReader<Person> itemReader) {
            this.itemReader = itemReader;
        }

        @Override
        public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
            ExecutionContext executionContext = chunkContext.getStepContext().getStepExecution().getExecutionContext();
            if (!readerInitialized) {
                itemReader.open(executionContext);
                readerInitialized = true;
            }
            Person person = itemReader.read();
            if (person == null) {
                itemReader.close();
                return RepeatStatus.FINISHED;
            }
            // process the item
            process(person);
            // write the item in its own file (dynamically generated at runtime)
            write(person, executionContext);
            // save current state in execution context: in case of restart after failure, the job would resume where it left off.
            itemReader.update(executionContext);
            return RepeatStatus.CONTINUABLE;
        }

        private void process(Person person) {
            // do something with the item
        }
        
        private void write(Person person, ExecutionContext executionContext) throws Exception {
            FlatFileItemWriter<Person> itemWriter = new FlatFileItemWriterBuilder<Person>()
                    .resource(new FileSystemResource("person" + person.getId() + ".csv"))
                    .name("personItemWriter")
                    .delimited()
                    .names("id", "name")
                    .build();
            itemWriter.open(executionContext);
            itemWriter.write(Collections.singletonList(person));
            itemWriter.close();
        }
        
    }

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        jobLauncher.run(job, new JobParameters());
    }

    @Bean
    public DataSource dataSource() {
        EmbeddedDatabase embeddedDatabase = new EmbeddedDatabaseBuilder()
                .setType(EmbeddedDatabaseType.H2)
                .addScript("/org/springframework/batch/core/schema-drop-h2.sql")
                .addScript("/org/springframework/batch/core/schema-h2.sql")
                .build();
        JdbcTemplate jdbcTemplate = new JdbcTemplate(embeddedDatabase);
        jdbcTemplate.execute("create table person (id int primary key, name varchar(20));");
        for (int i = 1; i <= 10; i++) {
            jdbcTemplate.execute(String.format("insert into person values (%s, 'foo%s');", i, i));
        }
        return embeddedDatabase;
    }

    static class Person {
        private int id;
        private String name;

        public Person() {
        }

        public int getId() {
            return id;
        }

        public void setId(int id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String toString() {
            return "Person{id=" + id + ", name='" + name + '\'' + '}';
        }
    }

}

此示例从 db 表中读取 10 个人并生成 10 个 csv 文件(person1.csvperson2.csv 等)

【讨论】:

  • 谢谢,非常感谢您的努力:),但是您认为使用 Spring Batch 是理想的解决方案吗?如果查询返回大量数据是否会影响性能,我假设我们不能在这里使用任何弹簧缩放选项?如果在块步骤中使用复合编写器,我们如何动态传递文件名?
  • 是的,我认为 Spring Batch 是一个不错的选择。您立即得到的是,如果发生故障,作业将从中断处重新开始(在我之前的示例中处理/写入项目后,您需要添加itemReader.update(executionContext);)。对于复合写入器方法,您可以将文件名动态传递给复合写入器,但只能在配置时,而不是在运行时。启动步骤后,您无法更改步骤的编写者。因此,如果您想要复合编写器方法,您需要预先计算有多少编写器并在复合中配置它们。
  • 在您的示例中,您需要预先确定您有 100 个不同的项目(即要创建 100 个文件),创建 100 个项目编写器实例并在您设置的复合项目编写器中配置它们步骤..
  • 您仍然可以使用缩放选项:1) 多线程步骤:在 tasklet 上设置 .taskExecutor(yourTaskExecutor) 并确保一切都是线程安全的 2) 使用分区:给每个 tasklet 一个不同的分区并它们并行工作。
  • 复合写入器方法对我来说似乎不合适,因为如果我们有 100 万条记录,我们需要预先计算它并且创建这么多对象会降低性能,
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2010-09-06
  • 2017-12-04
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多