【问题标题】:Spring batch rollback all chunks & write one file at a timeSpring批量回滚所有块并一次写入一个文件
【发布时间】:2018-07-15 16:15:54
【问题描述】:

我是春季批次的新手,我有几个问题。

问题 1: 我正在使用 MultiResourceItemReader 读取一堆 CSV 文件,并使用 JDBC 项目编写器批量更新数据库。提交间隔设置为 1000。如果有一个包含 10k 条记录的文件并且我在第 7 批时遇到 DB 错误,有什么办法可以回滚之前提交的所有块?

问题 2: 如果有两个文件,每个文件都有 100 条记录,并且提交间隔设置为 1000,那么 MultiResourceItemReader 会读取这两个文件并将其发送给 Writer。在这种情况下,有什么方法可以一次只写一个文件而忽略提交间隔,基本上只在 writer 中创建一个循环?

【问题讨论】:

    标签: spring spring-batch


    【解决方案1】:

    发布对我有用的解决方案,以防有人需要参考。

    对于问题 1,我能够使用作者中的 StepListenerSupport 并覆盖 BeforeStepAfterStep 来实现它。示例 sn-p 如下

    public class JDBCWriter extends StepListenerSupport implements ItemWriter<MyDomain>{
    
    private boolean errorFlag;
    
    private String sql = "{ CALL STORED_PROC(?, ?,  ?, ?, ?) }";
    
    @Autowired
    private JdbcTemplate jdbcTemplate;
    
    @Override
    public void beforeStep(StepExecution stepExecution){
    
        try{
            Connection connection = jdbcTemplate.getDataSource().getConnection();
    
            connection.setAutoCommit(false);
        }
        catch(SQLException ex){
            setErrorFlag(Boolean.TRUE);
        }
    }
    
    @Override
    public void write(List<? extends MyDomain> items) throws Exception{
    
        if(!items.isEmpty()){
    
                    CallableStatement callableStatement = connection.prepareCall(sql);
    
                    callableStatement.setString("1", "FirstName");
                    callableStatement.setString("2", "LastName");
                    callableStatement.setString("3", "Date of Birth");
                    callableStatement.setInt("4", "Year");
    
                    callableStatement.registerOutParameter("errors", Types.INTEGER, "");
    
                    callableStatement.execute();
    
                    if(errors != 0){
                        this.setErrorFlag(Boolean.TRUE);
                        }
                }
        else{
            this.setErrorFlag(Boolean.TRUE);
        }
    }
    
    @Override
    public void afterChunk(ChunkContext context){
        if(errorFlag){
            context.getStepContext().getStepExecution().setExitStatus(ExitStatus.FAILED); //Fail the Step
            context.getStepContext().getStepExecution().setStatus(BatchStatus.FAILED); //Fail the batch
        }
    }
    
    @Override
    public ExitStatus afterStep(StepExecution stepExecution){
        try{
            if(!errorFlag){
                connection.commit();
            }
            else{
                connection.rollback();
                stepExecution.setExitStatus(ExitStatus.FAILED);
            }
        }
        catch(SQLException ex){
            LOG.error("Commit Failed!" + ex);
        }
    
        return stepExecution.getExitStatus();
    }
    
    public void setErrorFlag(boolean errorFlag){
        this.errorFlag = errorFlag;
        }
    }
    

    XML 配置:

    <?xml version="1.0" encoding="UTF-8"?>
    
    <beans xmlns="http://www.springframework.org/schema/beans"
        ....
        http://www.springframework.org/schema/batch/spring-batch-3.0.xsd">
    
    <job id="fileLoadJob" xmlns="http://www.springframework.org/schema/batch">
    
        <step id="batchFileUpload" >
            <tasklet>
                <chunk reader="fileReader"
                       commit-interval="1000"
                       writer="JDBCWriter"
                />
            </tasklet>
        </step>
    
    </job>
    
    <bean id="fileReader" class="...com.FileReader" />
    <bean id="JDBCWriter" class="...com.JDBCWriter" />
    
    </beans>
    

    【讨论】:

      【解决方案2】:

      问题 1:完成此任务的唯一方法是通过某种形式的补偿逻辑。您可以通过侦听器(例如ChunkListener#afterChunkError)来做到这一点,但实现取决于您。 Spring Batch 中没有任何东西知道输出的整体状态是什么以及如何将其回滚到当前事务之外。

      问题 2:假设您要为每个输入文件寻找一个输出文件,由于大多数 Resource 实现是非事务性的,因此与它们相关联的编写者会做一些特殊的工作来缓冲到提交点,然后刷新。这里的问题是,正因为如此,没有真正的机会将该缓冲区划分为多个资源。需要明确的是,它可以完成,您只需要自定义 ItemWriter 即可。

      【讨论】:

      • 谢谢!关于问题2,我正在考虑使用某种PeekableItemReader 实现自定义CompletionPolicy,但它正在进入无限循环。有没有关于如何将MultiResourceItemReaderPeekableItemReader 连接的工作示例?
      • 对于问题 2,你不需要在 reader 中修复东西,你需要在 writer 中修复东西。您已经知道资源已更改(假设您在项目上使用MultiResourceItemReaderResourceAware)。您只需要在写入时为每个资源创建一个新缓冲区,然后在提交点将它们刷新到适当的资源。
      • 正如您所建议的,我在该项目上使用MultiResourceItemReaderResourceAware,关于每个资源的缓冲区,您能否详细说明并指出任何可能的示例?谢谢!
      • 我没有例子。但我会想象一个Map&lt;Resource, StringBuilder&gt;,您可以在其中根据通过的资源写入适当的StringBuilder。然后当提交发生时,将每个StringBuilder 刷新到正确的资源。您可以通过查看FlatFileItemWriter 来了解缓冲区处理和刷新的工作原理。
      猜你喜欢
      • 2020-10-06
      • 1970-01-01
      • 2013-10-26
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-07-04
      • 2016-04-08
      • 2022-12-08
      相关资源
      最近更新 更多