【问题标题】:Spring Batch Transactions in StepExecutionListenerStepExecutionListener 中的 Spring 批处理事务
【发布时间】:2020-05-04 04:15:59
【问题描述】:

我有一个 spring 批处理作业,它从 Web 服务读取数据,在处理器中进行一些丰富,然后保存到 DB。如果有人为同一组参数运行相同的作业两次,我想删除 db 中的旧数据,然后作为该作业的一部分重新写入。

我已经在 Step Method 之前的 StepExecutionListener 中编写了删除逻辑。

如何使我的步骤具有事务性,以便在作业中出现错误时回滚删除操作?

this.stepBuilderFactory.get("xStep")
.<Item,Item>chunk(1000)
.reader(xReader)
.processor(xProcessor)
.writer(xWriter)
.listener(xStepExecutionListenerForDelete)
.build()

【问题讨论】:

  • 我不明白这怎么可能。根据设计,Spring Batch 防止运行相同的作业实例两次以完成。对于同一组作业参数,您最终应该得到相同的作业实例,所以我看不出如何两次运行同一个作业实例并擦除以前的数据。你的意思是之前的执行失败了,你想从一个干净的“工作表”重新开始?
  • 要启用此功能,我将 System.currentTimeMillis() 添加到作业参数中。一个用例可能是源系统中的数据发生变化,我需要重新运行作业。
  • 我可以尝试使用与以前的System.currentTimeMillis() 对应的相同long 重新运行相同的作业,并最终在同一个作业实例上运行,但 Spring Batch 会阻止我如果上一次执行成功,则运行两次。

标签: java spring transactions spring-batch


【解决方案1】:

如何使我的步骤具有事务性,以便在作业中出现错误时回滚删除操作?

您可以将删除逻辑编写为项目编写器的一部分,该项目编写器在 Spring Batch 驱动的事务中调用。如果事务因任何原因回滚,您的删除操作将被回滚。请注意,item writer 不仅用于插入数据,还可以用于更新和删除数据(MongoItemWriter#setDelete 就是一个例子)。

【讨论】:

  • 当然我可以将此逻辑移至 ItemWriter。想确认是否有办法使 StepExecutionListener 中的 BeforeStep 方法成为事务的一部分。感谢您的回复。
【解决方案2】:

作业参数

如果已经存在具有相同“识别”作业参数的实例,作业将不会开始。

作业将关闭,但出现以下异常: org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException

您可以创建一个随机生成的值或日期的作业参数。在您的侦听器中,您可以验证前一个作业实例的作业参数,不包括您的“唯一”作业参数。

事务监听器

将@Transactional 注释添加到侦听器以将其包装在事务中。

示例

@Transactional
public class DataErasureListener implements StepExecutionListener {

    @Autowired
    JobExplorer jobExplorer;

    @Override
    public void beforeStep(StepExecution stepExecution) {

        String jobName = stepExecution.getJobExecution().getJobInstance().getJobName();
        Map<String, JobParameter> currentJobParameters = stepExecution.getJobParameters().getParameters();

        int jobInstanceCount = jobExplorer.getJobInstanceCount(jobName);

        if (jobInstanceCount == 1) {
            //No prior run
            return;
        }

        //The list of JobInstances are in descending order of creation. Grab the 2nd one.
        JobInstance priorJobInstance = jobExplorer.getJobInstances(jobName, 0, jobInstanceCount).get(1);
        JobExecution priorJobExecution= jobExplorer.getLastJobExecution(priorJobInstance);
        Map<String, JobParameter> priorJobParameters = priorJobExecution.getJobParameters().getParameters();

        //Compare prior job parameters excluding "unique" job parameters
        currentJobParameters.remove("unique");
        priorJobParameters.remove("unique");

        if (currentJobParameters.equals(priorJobParameters)) {
            //Delete old data
        }
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        return stepExecution.getExitStatus();
    }
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-02-14
    • 2016-12-12
    • 2021-05-31
    • 1970-01-01
    • 2012-12-31
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多