【发布时间】:2018-05-14 17:50:40
【问题描述】:
我遇到了 Spring Batch 的问题。我们在工作中使用了一个任务执行器(simpleAsyncTaskExecutor)来处理两个并行步骤的流程。
在每个步骤中,任务执行器将读取器返回的每个数据块拆分到不同的线程(使用多线程步骤概念:参见https://docs.spring.io/spring-batch/trunk/reference/html/scalability.html)
问题是我们的提交间隔很大(24,000),读取器返回的行数非常少(少于 50 行)但是写入器有时会收到超过一个块(例如30 行和 20 行的块,但对于另一次运行,它可以是 25 行的块和 25 行的块或只有 50 行的块,它似乎是随机的)而它应该只接收 50 的块任何运行的行(它不应该是随机的),因为它不会超过提交间隔。
我试图了解为什么在某些运行中会随机发生这种情况。 如果有人知道 Spring Batch 中的这个问题,你能帮我吗?
谢谢。
这是我的工作配置(不包括我们的自定义编写器):
<batch:job id="job">
<batch:split id="split" task-executor="taskExecutor">
<batch:flow>
<batch:step id="step1">
<batch:tasklet task-executor="taskExecutor" throttle-limit="4" >
<batch:chunk reader="reader1" writer="writer1" commit-interval="24000" />
</batch:tasklet>
</batch:step>
</batch:flow>
<batch:flow>
<batch:step id="step2">
<batch:tasklet task-executor="taskExecutor" throttle-limit="4" >
<batch:chunk reader="reader2" writer="writer2" commit-interval="24000" />
</batch:tasklet>
</batch:step>
</batch:flow>
</batch:split>
</batch:job>
<bean id="reader1" class="org.springframework.batch.item.database.JdbcPagingItemReader" scope="step">
<property name="dataSource" ref="postgresql_1" />
<property name="queryProvider">
<bean class="org.springframework.batch.item.database.support.PostgresPagingQueryProvider">
<property name="selectClause" value="
SELECT name
" />
<property name="fromClause" value="
FROM database.people
" />
<property name="whereClause" value="
WHERE age > 30
" />
<property name="sortKeys">
<map>
<entry key="people_id" value="ASCENDING"/>
</map>
</property>
</bean>
</property>
<property name="saveState" value="false" />
<property name="rowMapper">
<bean class="fr.myapp.PeopleRowMapper" />
</property>
</bean>
<bean id="reader2" class="org.springframework.batch.item.database.JdbcPagingItemReader" scope="step">
<property name="dataSource" ref="postgresql_1" />
<property name="queryProvider">
<bean class="org.springframework.batch.item.database.support.PostgresPagingQueryProvider">
<property name="selectClause" value="
SELECT product_name
" />
<property name="fromClause" value="
FROM database.products
" />
<property name="whereClause" value="
WHERE product_order_date <= '01/11/2017'
" />
<property name="sortKeys">
<map>
<entry key="product_id" value="ASCENDING"/>
</map>
</property>
</bean>
</property>
<property name="saveState" value="false" />
<property name="rowMapper">
<bean class="fr.myapp.ProductsRowMapper" />
</property>
</bean>
<bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor">
<property name="concurrencyLimit" value="8" />
</bean>
【问题讨论】:
标签: spring-batch batch-processing