【发布时间】:2020-01-25 07:36:45
【问题描述】:
当我在多个线程中运行我的步骤时,我在处理结果集时在我的行映射器中得到一个空指针异常,即使对于具有显式空检查的条目也是如此。当我在没有taskExecutor()/ 单线程的情况下执行它时工作正常。我对几件事感到困惑。我的理解是,如果我将提交间隔指定为 100,核心线程数指定为 10,则每个线程都会拉出 100 个块并独立处理。
- chunking-reader-row mapper 三重奏是如何工作的?如果我的阅读器中有一个查询获取 100 万行和 1000 块大小,这是否意味着阅读器将访问数据库 1000 次?并且在每次行映射器将映射所有获取的 1000 行之后? 线程如何影响行映射器?
代码如下:
@Bean
public Step myStep() {
return stepBuilderFactory.get(STEP_NAME).<MyModel, MyModel> chunk(1000)
.reader(myModelReader())
.writer(myModelWriter())
.taskExecutor(taskExecutor())
.listener(stepExecutionNotificationListener)
.listener(chunkExecutionListener)
.build();
}
@Bean
public Job myJob() {
return jobBuilderFactory.get(JOB_NAME)
.incrementer(new RunIdIncrementer())
.listener(jobCompletionNotificationListener)
.flow(myStep()).end().build();
}
@Bean
@StepScope
public JdbcCursorItemReader<MyModel> myModelReader(){
JdbcCursorItemReader<MyModel> reader = new JdbcCursorItemReader<>();
reader.setDataSource(dataSource);
reader.setVerifyCursorPosition(false);
reader.setSql("my query fetching millions of records joining multiple tables from the db");
reader.setRowMapper(new MyModelRowMapper());
return reader;
}
public class MyModelRowMapperimplements RowMapper<MyModel>{
@Override
public MyModel mapRow(ResultSet rs, int rowNum) throws SQLException {
MyModel myModel = new MyModel();
myModel.setEmailAddress(checkIsEmpty(rs.getString("EMAIL_ADDRESS")) ? "" : rs.getString("EMAIL_ADDRESS").replace("|", "")); // ----- The line which is failing!!! -----
return person;
}
}
public boolean checkIsEmpty(String stringToCheck)
{
if(stringToCheck==null || stringToCheck.isEmpty() || stringToCheck.equals("null"))
{
return true;
}
return false;
}
public TaskExecutor taskExecutor(){
ThreadPoolTaskExecutor threadPoolTaskExecutor=new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(10);
threadPoolTaskExecutor.setMaxPoolSize(25);
threadPoolTaskExecutor.setQueueCapacity(5);
threadPoolTaskExecutor.setThreadNamePrefix("MyModelBatch-");
threadPoolTaskExecutor.afterPropertiesSet();
return threadPoolTaskExecutor;
}
编辑 1
除了在非线程上下文中工作之外,如果我使用一次结果集,它也可以工作。 我将代码更改为
String email = rs.getString("EMAIL_ADDRESS");
myModel.setEmailAddress(checkIsEmpty(email) ? "" : email.replace("|", ""));
【问题讨论】:
-
您是在问 Spring Batching 的工作原理还是如何修复 NPE?
-
我希望通过了解 NPE 的工作原理来修复它。公平吗?
-
不,因为您假设 NPE 在某种程度上与线程有关。运行调试器并查看什么是空的,或者将错误行分解为多行,以便您可以看到发生 NPE 的确切行。您的
RowMapper是线程安全的,因此您应该先检查简单的事情,然后再将问题归咎于线程(并且批处理是从头开始为多线程构建的)。 -
至于你的块和数据库命中,你是对的。 1000 块大小使得 1000 db 命中 100 万行。
-
1.在发布问题之前,我已经尝试过了。 rs.getString("EMAIL_ADDRESS").replace("|", "") 失败,无论我提供什么空检查。当我尝试在调试中在 null 和有效值之间对其进行评估时,表达式的值会不断变化。 2.关于1000分贝的通话。一般来说,有这么多的数据库点击是一个好习惯吗?
标签: java spring multithreading spring-boot spring-batch