【问题标题】:Spring Batch restart persistent jobs after abnormal terminationSpring Batch 异常终止后重启持久化作业
【发布时间】:2015-12-05 00:06:24
【问题描述】:

我有以下 Spring Batch Job 配置:

@Configuration
@EnableBatchProcessing
public class JobConfig {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job job() {
        return jobBuilderFactory.get("job")
                .flow(stepA()).on("FAILED").to(stepC())
                .from(stepA()).on("*").to(stepB()).next(stepC())
                .end().build();
    }

    @Bean
    public Step stepA() {
        return stepBuilderFactory.get("stepA").tasklet(new RandomFailTasket("stepA")).build();
    }

    @Bean
    public Step stepB() {
        return stepBuilderFactory.get("stepB").tasklet(new PrintTextTasklet("stepB")).build();
    }

    @Bean
    public Step stepC() {
        return stepBuilderFactory.get("stepC").tasklet(new PrintTextTasklet("stepC")).build();
    }

}

我从以下代码开始工作:

    try {
        Map<String,JobParameter> parameters = new HashMap<>();
        JobParameter ccReportIdParameter = new JobParameter("03061980");
        parameters.put("ccReportId", ccReportIdParameter);

        jobLauncher.run(job, new JobParameters(parameters));
    } catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException
            | JobParametersInvalidException e) {
        e.printStackTrace();
    }

这是我的测试任务:

public class PrintTextTasklet implements Tasklet {

    public PrintTextTasklet() {
    }

    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {

        String ccReportId = chunkContext.getStepContext().getStepExecution().getJobParameters().getString("ccReportId");
        System.out.println("ccReportId: " + ccReportId);
        Thread.sleep(60 * 1000);
        return RepeatStatus.FINISHED;
    }

}

我使用 H2 数据库作为我的工作的持久存储。

在作业执行期间,我将终止我的应用程序。在应用程序重新启动后,我希望所有未完成的作业将从终止的步骤继续执行,但没有任何反应。

另外,在我的 application.properties 文件中,我添加了以下属性:

spring.batch.job.enabled=false

因为我不想让 Spring Batch 自动启动新的(未终止的)作业。我需要手动启动所有新作业(根据用户请求)并在下次应用程序运行后重新启动所有已完成的作业。

这种情况下如何配置Spring Batch?

更新

现在我正在尝试使用以下方法重新启动作业:

public void restartUncompletedJobs() {
        List<String> jobs = jobExplorer.getJobNames();
        for (String job : jobs) {
            Set<JobExecution> runningJobs = jobExplorer.findRunningJobExecutions(job);

            for (JobExecution runningJob : runningJobs) {
                try {
                    jobOperator.restart(runningJob.getId());
                    logger.info("Restarted: " + runningJob);
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
    }

但由于以下异常而失败:

org.springframework.batch.core.launch.NoSuchJobException: No job configuration with the name [job] was registered
    at org.springframework.batch.core.configuration.support.MapJobRegistry.getJob(MapJobRegistry.java:66) ~[spring-batch-core-3.0.5.RELEASE.jar:3.0.5.RELEASE]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_60]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_60]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_60]
    at java.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_60]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:302) ~[spring-aop-4.2.0.RELEASE.jar:4.2.0.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) ~[spring-aop-4.2.0.RELEASE.jar:4.2.0.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) ~[spring-aop-4.2.0.RELEASE.jar:4.2.0.RELEASE]
    at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127) ~[spring-batch-core-3.0.5.RELEASE.jar:3.0.5.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) ~[spring-aop-4.2.0.RELEASE.jar:4.2.0.RELEASE]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207) ~[spring-aop-4.2.0.RELEASE.jar:4.2.0.RELEASE]
    at com.sun.proxy.$Proxy94.getJob(Unknown Source) ~[na:na]
    at org.springframework.batch.core.launch.support.SimpleJobOperator.restart(SimpleJobOperator.java:275) ~[spring-batch-core-3.0.5.RELEASE.jar:3.0.5.RELEASE]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_60]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_60]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_60]
    at java.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_60]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:302) ~[spring-aop-4.2.0.RELEASE.jar:4.2.0.RELEASE]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:201) ~[spring-aop-4.2.0.RELEASE.jar:4.2.0.RELEASE]
    at com.sun.proxy.$Proxy96.restart(Unknown Source) ~[na:na]
    at com.example.domain.api.batch.job.ReportJobServiceImpl.restartUncompletedJobs(ReportJobServiceImpl.java:72) ~[classes/:na]
    at com.example.domain.api.Application.lambda$0(Application.java:46) [classes/:na]
    at org.springframework.boot.SpringApplication.runCommandLineRunners(SpringApplication.java:672) ~[spring-boot-1.2.5.RELEASE.jar:1.2.5.RELEASE]
    at org.springframework.boot.SpringApplication.afterRefresh(SpringApplication.java:690) ~[spring-boot-1.2.5.RELEASE.jar:1.2.5.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:321) ~[spring-boot-1.2.5.RELEASE.jar:1.2.5.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:957) ~[spring-boot-1.2.5.RELEASE.jar:1.2.5.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:946) ~[spring-boot-1.2.5.RELEASE.jar:1.2.5.RELEASE]
    at com.example.domain.api.Application.main(Application.java:53) [classes/:na]

【问题讨论】:

    标签: java spring spring-batch


    【解决方案1】:

    我通过添加以下行使其正常工作

    jobRegistry.register(new ReferenceJobFactory(job));
    

    进入restartUncompletedJobs方法

    public void restartUncompletedJobs() {
        try {
            jobRegistry.register(new ReferenceJobFactory(job));
    
            List<String> jobs = jobExplorer.getJobNames();
            for (String job : jobs) {
                Set<JobExecution> runningJobs = jobExplorer.findRunningJobExecutions(job);
    
                for (JobExecution runningJob : runningJobs) {
                    runningJob.setStatus(BatchStatus.FAILED);
                    runningJob.setEndTime(new Date());
                    jobRepository.update(runningJob);
                    jobOperator.restart(runningJob.getId());
                    logger.info("Restarted: " + runningJob);
                }
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    }
    

    【讨论】:

    • 我们如何才能恢复批处理作业而不是在失败的地方重新启动?
    • @alex 请问您是如何为该行创建作业对象的:jobRegistry.register(new ReferenceJobFactory(job));
    • @john 这将是您要重新启动的作业定义。你应该在某个地方定义你的工作,只需传递那个 Job 对象。例如:public Job processJob() { return this.jobBuilderFactory.get("processJob") .start(orderStep1()) .next(orderStep2()) .build(); }
    猜你喜欢
    • 2020-02-23
    • 2019-06-10
    • 2016-11-09
    • 1970-01-01
    • 2021-09-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多