【问题标题】:Spring Batch : Job randomly selects chunks with less rows than the commit-intervalSpring Batch:作业随机选择行数少于提交间隔的块
【发布时间】:2018-05-14 17:50:40
【问题描述】:

我遇到了 Spring Batch 的问题。我们在工作中使用了一个任务执行器(simpleAsyncTaskExecutor)来处理两个并行步骤的流程。

在每个步骤中,任务执行器将读取器返回的每个数据块拆分到不同的线程(使用多线程步骤概念:参见https://docs.spring.io/spring-batch/trunk/reference/html/scalability.html

问题是我们的提交间隔很大(24,000),读取器返回的行数非常少(少于 50 行)但是写入器有时会收到超过一个块(例如30 行和 20 行的块,但对于另一次运行,它可以是 25 行的块和 25 行的块或只有 50 行的块,它似乎是随机的)而它应该只接收 50 的块任何运行的行(它不应该是随机的),因为它不会超过提交间隔。

我试图了解为什么在某些运行中会随机发生这种情况。 如果有人知道 Spring Batch 中的这个问题,你能帮我吗?

谢谢。

这是我的工作配置(不包括我们的自定义编写器):

    <batch:job id="job">

             <batch:split id="split" task-executor="taskExecutor">  
                <batch:flow> 
                    <batch:step id="step1"> 
                        <batch:tasklet task-executor="taskExecutor" throttle-limit="4" >
                              <batch:chunk reader="reader1" writer="writer1" commit-interval="24000" />         
                        </batch:tasklet>
                    </batch:step>
                </batch:flow>
                <batch:flow> 

                    <batch:step id="step2">
                        <batch:tasklet task-executor="taskExecutor" throttle-limit="4" >
                              <batch:chunk reader="reader2" writer="writer2" commit-interval="24000" />         
                        </batch:tasklet>
                    </batch:step>
                </batch:flow>       
             </batch:split>
        </batch:job>

    <bean id="reader1" class="org.springframework.batch.item.database.JdbcPagingItemReader" scope="step">       
         <property name="dataSource" ref="postgresql_1" />                           

         <property name="queryProvider">            
             <bean class="org.springframework.batch.item.database.support.PostgresPagingQueryProvider">
                    <property name="selectClause" value="
                        SELECT name
                    " />
                    <property name="fromClause" value="
                        FROM database.people
                    " />
                    <property name="whereClause" value="
                        WHERE age > 30
                    " />
                    <property name="sortKeys">
                        <map>
                            <entry key="people_id" value="ASCENDING"/>
                        </map>
                    </property>             
            </bean>         
        </property>         
        <property name="saveState" value="false" />         
        <property name="rowMapper">             
             <bean class="fr.myapp.PeopleRowMapper" />      
        </property>     
    </bean>

        <bean id="reader2" class="org.springframework.batch.item.database.JdbcPagingItemReader" scope="step">       
       <property name="dataSource" ref="postgresql_1" />        
       <property name="queryProvider">          
               <bean class="org.springframework.batch.item.database.support.PostgresPagingQueryProvider">
                    <property name="selectClause" value="
                         SELECT product_name
                    " />
                    <property name="fromClause" value="
                        FROM database.products
                    " />
                    <property name="whereClause" value="
                        WHERE product_order_date <= '01/11/2017'
                    " />
                    <property name="sortKeys">
                        <map>
                            <entry key="product_id" value="ASCENDING"/>
                        </map>
                    </property>             
               </bean>      
           </property>      
       <property name="saveState" value="false" />      
       <property name="rowMapper">
                <bean class="fr.myapp.ProductsRowMapper" />
       </property>  
    </bean>
    <bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor">
          <property name="concurrencyLimit" value="8" />
    </bean>

【问题讨论】:

    标签: spring-batch batch-processing


    【解决方案1】:

    在待写入的记录数达到提交间隔(或块大小)之前,不应调用编写器的 write() 方法。在此之前唯一应该调用它的时间是如果 read() 在阅读器中返回 null 表示没有更多结果了。

    当您看到较小的块时,它是在工作的中间吗? RowMappers 中是否有任何逻辑或您省略的任何内容可以汇总读取结果?

    【讨论】:

    • 嗨,乔,谢谢您的回答。正如我在帖子中所说,这种行为似乎是随机的(有时读者发送 50 个中的 1 个块,我认为这是好的行为,有时是 2 个较小的块)所以我猜它更像是某个地方的错误,而不是某个地方的某些逻辑。 PersonRowMapper 仅将行映射到正确的 PersonBean。我看到在读者执行他的 read() 之后调用作者的 write() 方法时,作者在 2 个线程中收到 1 个块或 2 个较小的块
    猜你喜欢
    • 2016-10-18
    • 2015-12-10
    • 2014-08-13
    • 2019-09-26
    • 2011-10-11
    • 2015-10-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多