【发布时间】:2019-08-01 21:51:12
【问题描述】:
我有一个批处理作业,从 SQLServer 读取记录并写入 MariaDB。即使我在批处理过程中实现了分区的概念,但过程很慢
以下是源系统和目标系统的数据源配置。
@Bean(name = "sourceSqlServerDataSource")
public DataSource mysqlDataSource() {
HikariDataSource hikariDataSource = new HikariDataSource();
hikariDataSource.setMaximumPoolSize(100);
hikariDataSource.setUsername(username);
hikariDataSource.setPassword(password);
hikariDataSource.setJdbcUrl(jdbcUrl);
hikariDataSource.setDriverClassName(driverClassName);
hikariDataSource.setPoolName("Source-SQL-Server");
return hikariDataSource;
}
@Bean(name = "targetMySqlDataSource")
@Primary
public DataSource mysqlDataSource() {
HikariDataSource hikariDataSource = new HikariDataSource();
hikariDataSource.setMaximumPoolSize(100);
hikariDataSource.setUsername(username);
hikariDataSource.setPassword(password);
hikariDataSource.setJdbcUrl(jdbcUrl);
hikariDataSource.setDriverClassName(driverClassName);
hikariDataSource.setPoolName("Target-Myql-Server");
return hikariDataSource;
}
下面是配置的My Bean和线程池taskexecutor
@Bean(name = "myBatchJobsThreadPollTaskExecutor")
public ThreadPoolTaskExecutor initializeThreadPoolTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(100);
threadPoolTaskExecutor.setMaxPoolSize(200);
threadPoolTaskExecutor.setThreadNamePrefix("My-Batch-Jobs-TaskExecutor ");
threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(Boolean.TRUE);
threadPoolTaskExecutor.initialize();
log.info("Thread Pool Initialized with min {} and Max {} Pool Size",threadPoolTaskExecutor.getCorePoolSize(),threadPoolTaskExecutor.getMaxPoolSize() );
return threadPoolTaskExecutor;
}
这里是配置的步骤和分区步骤
@Bean(name = "myMainStep")
public Step myMainStep() throws Exception{
return stepBuilderFactory.get("myMainStep").chunk(500)
.reader(myJdbcReader(null,null))
.writer(myJpaWriter()).listener(chunkListener)
.build();
}
@Bean
public Step myPartitionStep() throws Exception {
return stepBuilderFactory.get("myPartitionStep").listener(myStepListener)
.partitioner(myMainStep()).partitioner("myPartition",myPartition)
.gridSize(50).taskExecutor(asyncTaskExecutor).build();
}
用读者和作者更新帖子
@Bean(name = "myJdbcReader")
@StepScope
public JdbcPagingItemReader myJdbcReader(@Value("#{stepExecutionContext[parameter1]}") Integer parameter1, @Value("#{stepExecutionContext[parameter2]}") Integer parameter2) throws Exception{
JdbcPagingItemReader jdbcPagingItemReader = new JdbcPagingItemReader();
jdbcPagingItemReader.setDataSource(myTargetDataSource);
jdbcPagingItemReader.setPageSize(500);
jdbcPagingItemReader.setRowMapper(myRowMapper());
Map<String,Object> paramaterMap=new HashMap<>();
paramaterMap.put("parameter1",parameter1);
paramaterMap.put("parameter2",parameter2);
jdbcPagingItemReader.setQueryProvider(myQueryProvider());
jdbcPagingItemReader.setParameterValues(paramaterMap);
return jdbcPagingItemReader;
}
@Bean(name = "myJpaWriter")
public ItemWriter myJpaWriter(){
JpaItemWriter<MyTargetTable> targetJpaWriter = new JpaItemWriter<>();
targetJpaWriter.setEntityManagerFactory(localContainerEntityManagerFactoryBean.getObject());
return targetJpaWriter;
}
有人可以说明如何使用 Spring 批处理提高读写性能...?
【问题讨论】:
-
你也可以发布读者/作者吗?
-
@StanislavL 感谢您的回复。我已经用读者和作者更新了这篇文章
-
你有没有分析过?瓶颈是什么?
-
@MichaelMinella 对不起,您能否指导/建议如何实现分析并找出瓶颈。由于我已经实现了步骤执行侦听器,工作开始和结束之间的时间差是25 万条记录需要 30 分钟
-
1.添加必要的配置以在生成的日志中输出 sql 以提取数据。 2.使用sql客户端手动运行sql,查看查询执行计划。查找表扫描。如果您看到表扫描可能会创建一个索引。 3. 还要确认应用服务器和数据库服务器之间的延迟。
标签: spring spring-boot spring-batch