【问题标题】:Return job id "immediately" for spring batch job before it completes在春季批处理作业完成之前“立即”返回作业 ID
【发布时间】:2017-05-12 19:29:30
【问题描述】:

我正在做一个使用 Spring Boot、Spring Batch 和 Camel 的项目。

批处理是通过调用休息端点来启动的。休息控制器启动一个骆驼路线,该路线启动弹簧批处理作业流程(通过弹簧批处理骆驼组件)。

我无法控制调用我的应用程序的外部应用程序。我的申请是更大的夜间工作流程的一部分。

批处理作业可能需要很长时间才能完成,因此外部应用程序会通过另一个休息端点定期轮询我的批处理作业,询问作业是否完成。它通过使用它想要状态的 jobExecution 的 id 轮询状态休息端点来做到这一点。

为了完成这个流程,我实现了一个休息控制器,它通过 ProducerTemplate 启动骆驼路线。我的问题是在开始骆驼路线后立即返回作业执行 ID。我不想让 rest 调用等到作业完成才返回。

startJobViaRestCall ------> createBatchJob ----> runBatchJobUntilDone
                                  |
                                  |
       Return jobExecutionData    |
<----------------------------------

我尝试过使用异步调用和期货,但没有成功。我也尝试过使用 Camels 窃听器,但无济于事。问题是只有“onComplete”事件。我需要一个在创建作业后立即返回但不运行的钩子。

例如,以下代码等待批处理作业完成,然后返回我要发回的 JobExecution 数据(以 json 格式)。这是有道理的,因为 extractFutureBody 会等到响应准备好。

@RestController
@Slf4j
public class BatchJobController {

    @Autowired
    ProducerTemplate producerTemplate;

    @RequestMapping(value = "/batch/job/start", method = RequestMethod.GET)
    @ResponseBody
    public String startBatchJob() {
        log.info("BatchJob start called...");

        String jobExecution = producerTemplate.extractFutureBody(producerTemplate.asyncRequestBody(BatchRoute.ENDPOINT_JOB_START, ""), String.class);

        return jobExecution;
    }

}    

骆驼路线是对spring-batch-component的简单调用

public class BatchRoute<I, O> extends BaseRoute {

    private static final String ROUTE_START_BATCH = "spring-batch:springBatchJob";

    @Override
    public void configure() {

        super.configure();
        from(ENDPOINT_JOB_START).to(ROUTE_START_BATCH);

    }
}

关于如何尽快返回 JobExecution 数据的任何想法?

【问题讨论】:

    标签: spring-boot apache-camel spring-batch


    【解决方案1】:

    不确定如何在 Camel 中执行此操作,但这里是使用 spring-rest 执行作业的示例。

    @RestController
    public class KpRest {
    
        private static final Logger LOG = LoggerFactory.getLogger(KpRest.class);
        private static String RUN_ID_KEY = "run.id";
    
        @Autowired
        private JobLauncher launcher;
    
        private final AtomicLong incrementer = new AtomicLong();
    
    
        @Autowired
        private Job job;
    
    
        @RequestMapping("/hello")
        public String sayHello(){
    
            try {
                JobParameters parameters = new JobParametersBuilder().addLong(RUN_ID_KEY, incrementer.incrementAndGet()).toJobParameters();
                JobExecution execution = launcher.run(job, parameters);
                LOG.info("JobId {}, JobStatus {}", execution.getJobId(), execution.getStatus().getBatchStatus());
                return String.valueOf(execution.getJobId());
            } catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException
                    | JobParametersInvalidException e) {
                LOG.info("Job execution failed, {}", e);
            }
            return "Some Error";
        }
    }
    

    您可以通过修改 JobLauncher 使 Job 异步。

        @Bean
        public JobLauncher simpleJobLauncher(JobRepository jobRepository){
            SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
            jobLauncher.setJobRepository(jobRepository);
            jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
            return jobLauncher;
        }
    

    更多信息请参考documentation

    【讨论】:

    • 谢谢! 'SimpleAsyncTaskExecutor' 是线索。我定义了一个自定义 JobLauncher,它删除了 Spring Batch Camel 组件添加的所有作业参数。我需要将“TaskExecutor”设置为“SimpleAsyncTaskExecutor”。它默认为“SyncTaskExecutor”,因为我的自定义 JobLauncher 扩展了“SimpleJobLauncher”。
    • 嗨 Karthik,有什么方法或机制可以让我在休息控制器中添加等待,直到 SimpleAsyncTaskExecutor 生成的所有线程完成并返回给调用者
    • @Barvepan 请参考上面给出的文档链接,默认情况下 JobLauncher 是同步的,对于 HTTP,我们将任务执行器更改为异步。所以注释掉设置taskExecutor。我宁愿保持异步,否则您的 HTTP 调用将阻塞调用。要在作业完成后获得通知,您可以通过挂接作业完成侦听器来使用 websocket 或 jms。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-10-19
    • 1970-01-01
    • 2020-04-23
    相关资源
    最近更新 更多