【问题标题】:Spring batch Step Partitionning with PagingAndSortingRepository使用 PagingAndSortingRepository 进行 Spring 批处理步骤分区
【发布时间】:2023-04-07 14:45:01
【问题描述】:

我正在尝试为分区配置 Spring Batch Steps。找到here 的漂亮示例显示了一个关于“id 范围”的分区,但我不知道从哪里开始“数据页”范围。

在我的连续步骤中,我有:

  • reader : 使用 PagingAndSortingRepository 的 RepositoryItemReader
  • 处理器:数据转换器
  • writer : 使用 CrudRepository 的 RepositoryItemWriter
  • 块:5
  • 监听器:一个 StepListener

return stepBuilderFactory.get("stepApplicationForm") .<OldApplicationForm, NewApplicationForm>chunk(5) .reader(reader).processor(processor).writer(writer) .listener(listener).build();

据我所知,对于分区,我必须创建一个分区器,然后我有一个“父”步骤告诉将分区器与子步骤一起使用,然后是“子”步骤,读者知道“分页”参数。

对于TaskExecutor,我认为ThreadPoolTaskExecutor 适合。

基于数据“页面”实现/配置分区的好方法是什么?我应该检查哪些线程注意事项?

谢谢:)

【问题讨论】:

    标签: java spring spring-batch


    【解决方案1】:

    每个分区都有自己的项目读取器和项目写入器实例。您的分区实现将找到数据负载的最小最大值。使用您自己的逻辑,您可以在执行上下文中创建最小值和最大值。在查询数据库时,您可以使用它们来处理特定的数据片段,这样就不会发生并发问题。

    @Bean
    public Step myMasterStep() {
        return  stepBuilderFactory.get("myMasterStep")
                .partitioner("mySlaveWorker", myPartitioner())
                .partitionHandler(myPartitionHandler()).build();
    }
    
    
    @Bean
        public Step mySlaveWorker() {
            return stepBuilderFactory
                    .get("mySlaveWorker")
                    .<OldApplicationForm, NewApplicationForm> chunk(5)
                    .faultTolerant()
                    .listener(MyStepListener())
                    .skip(DataAccessException.class)
                    .skip(FatalStepExecutionException.class)
                    .skip(Exception.class)
                    .skipLimit(75)
                    .noRollback(DataAccessException.class)
                    .noRollback(FatalStepExecutionException.class)
                    .noRollback(Exception.class)
                    .reader(myDataItemReader())
                    .writer(myDataItemWriter()).build();
        }
    
    @Bean
    @StepScope
    public MyDataItemReader myDataItemReader(
            @Value("#{stepExecutionContext[minId]}") Long minId,
            @Value("#{stepExecutionContext[maxId]}") Long maxId) {
        MyDataItemReader myDataItemReader = new MyDataItemReader();
        myDataItemReader.setPageSize(100);
        myDataItemReader.setMinId(minId);
        myDataItemReader.setMaxId(maxId);
        return myDataItemReader;
    }
    
    @Bean
    @StepScope
    public MyDataItemWriter myDataItemWriter() {
        return new MyDataItemWriter();
    }
    
    @Bean
    @StepScope
    public MyPartitioner myPartitioner() {
        MyPartitioner myPartitioner = new MyPartitioner();
        myPartitioner.setDataSource(dataSource);
    return myPartitioner;
    }
    
    public class MyStepListener implements SkipListener<OldApplicationForm, NewApplicationForm> {
    
    private static final Logger LOGGER = LoggerFactory.getLogger(MyStepListener.class);
    
    
    public void onSkipInProcess(OldApplicationForm item, Throwable t) {
        LOGGER.error("onSkipInProcess" + t.getMessage());
    }
    
    public void onSkipInRead(Throwable t) {
        LOGGER.error("onSkipInRead " + t.getMessage());
    }
    
    
    public void onSkipInWrite(NewApplicationForm item, Throwable t) {
        //logs
        LOGGER.error("In MyStepListener --> onSkipInProcess" + t.getMessage());
    }
    
    }
    

    【讨论】:

    • 使用上面的代码,你也有跳过机制。当您不希望您的批次因少量错误记录而失败时,它尤其有用。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-10-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多