【问题标题】:Spring batch (spring boot - java config) multithreaded jdbc writerSpring Batch (spring boot - java config) 多线程 jdbc 编写器
【发布时间】:2017-03-21 18:53:47
【问题描述】:

我需要将大量数据写入mysql。

我想在多个线程或性能中进行。我想使用 spring 批处理分区,但以前从未这样做过。

我的spring批处理java配置(部分):

@Bean
ItemWriter<Event> writer() throws SQLException {
    return new CustomJdbcBatchDataWriter();
}

@Bean
public TaskExecutor taskExecutor(){
    SimpleAsyncTaskExecutor asyncTaskExecutor=new SimpleAsyncTaskExecutor("spring_batch");
    asyncTaskExecutor.setConcurrencyLimit(threadsAmount);
    return asyncTaskExecutor;
}

@Bean
public Job importUserJob(JobCompletionNotificationListener listener) throws Exception {
    return jobBuilderFactory.get("importUserJob")
            .incrementer(new RunIdIncrementer())
            .listener(listener)
            .flow(step1())
            .end()
            .build();
}

@Bean
public Step step1() throws Exception {
    return stepBuilderFactory.get("step1")
            .<Event, Event>chunk(chunkSize)
            .reader(reader())
            .processor(processor())
            .writer(writer())
            .taskExecutor(taskExecutor())
            .build();
}

还有我的自定义 jdbc 编写器(必须编写它以禁用自动提交 - 以提高性能):

公共类 CustomJdbcBatchDataWriter 实现 ItemWriter {

@Override
public void write(List<? extends Event> items) throws Exception {
try (
    Connection connection = DriverManager.getConnection(
            "jdbc:mysql://localhost:3306/batch?useSSL=false&useServerPrepStmts=false&rewriteBatchedStatements=true",
            "user", "password") ) {
    connection.setAutoCommit(false);


    String sql = "INSERT INTO events VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
    PreparedStatement ps = connection.prepareStatement(sql);

    for (Event p : items) {
        try {
            ps.setString(1, p.getId());
       //Setting rest of data into prepared statement
       ...
            ps.addBatch();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    ps.executeBatch();
    connection.commit();
}
}

}

如何配置它以便在不同的线程中插入不同的数据以获得性能? 非常感谢任何帮助。

【问题讨论】:

  • 我不认为多线程是解决方案,因为你是 io 绑定而不是计算绑定
  • 但是如果我需要放置 1 亿条记录呢?我将在一个线程中插入 1000 万,然后在另一个线程中插入 1000 万,依此类推。
  • 同样不会加快速度,10个线程意味着cpu一次做10件事情,你需要一个磁盘一次做10件事情
  • @SteelToe 添加线程绝对会提高吞吐量。 Spring Batch 虽然速度很快,但从文件读取/写入的速度比大多数数据库在处理inserts 时要慢得多。通过增加 Spring Batch 中的线程数,我们看到了近乎线性的性能改进。
  • 您不需要自定义 jdbc 编写器,spring 编写器参与正在进行的事务并在块的末尾提交。您需要正确设置 tx 管理,您的也在泄漏连接,因此在 x 运行后您的数据库连接已用完。

标签: java mysql spring jdbc spring-batch


【解决方案1】:

这里有一些东西可以帮助您入门。我还没有测试它,但它至少应该让你接近。

//updated this bean of yours. the others are new
@Bean
public Job importUserJob(JobCompletionNotificationListener listener) throws Exception {
    return jobBuilderFactory.get("importUserJob")
            .incrementer(new RunIdIncrementer())
            .listener(listener)
            .flow(partitionStep())
            .end()
            .build();
}

@Bean
public Step partitionStep(){
    return stepBuilderFactory.get("partitionStep")
            .partitioner(step1()) //leverage the step you already have
            .partitioner("step1", partitioner())
            .gridSize(10) //# of threads
            .taskExecutor(taskExecutor())
            .build();
}

@Bean
public Partitioner partitioner() {
    //Use this partitioner to add ranges for your reader
    //NOTE: your reader needs to be in @StepScope to pull from the Step Execution Context
    return new YourCustomPartitioner();
}

@Bean
public TaskExecutor taskExecutor() {
    return new SimpleAsyncTaskExecutor();
}

【讨论】:

  • 谢谢。如果我想对 writer 进行分区,那么它也应该在 @StepScope 中?
  • 是的。但是,据我所知,您的编写器是线程安全的,因此可能不需要这样做。旁注:您可能需要考虑使用JdbcBatchItemWriter 而不是您的自定义。
猜你喜欢
  • 2017-07-28
  • 2017-01-29
  • 2015-02-01
  • 1970-01-01
  • 2015-12-10
  • 2017-10-20
  • 2018-07-10
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多