【问题标题】:Spring batch execute dynamically generated steps in a taskletSpring批处理在tasklet中执行动态生成的步骤
【发布时间】:2016-12-21 07:14:21
【问题描述】:

我有一个春季批处理作业,它执行以下操作...

步骤 1. 创建需要处理的对象列表

第 2 步。根据第 1 步中创建的对象列表中的项目数创建一个步骤列表。

步骤 3。尝试执行步骤 2 中创建的步骤列表中的步骤。

执行 x 步骤在下面的 executeDynamicStepsTasklet() 中完成。虽然代码运行没有任何错误,但它似乎没有做任何事情。我在该方法中的内容看起来正确吗?

谢谢

/* * */

@Configuration
public class ExportMasterListCsvJobConfig {

public static final String JOB_NAME = "exportMasterListCsv";
@Autowired
public JobBuilderFactory jobBuilderFactory;

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Value("${exportMasterListCsv.generateMasterListRows.chunkSize}") 
public int chunkSize;

@Value("${exportMasterListCsv.generateMasterListRows.masterListSql}") 
public String masterListSql;

@Autowired
public DataSource onlineStagingDb;

@Value("${out.dir}") 
public String outDir;

@Value("${exportMasterListCsv.generatePromoStartDateEndDateGroupings.promoStartDateEndDateSql}") 
private String promoStartDateEndDateSql;


private List<DivisionIdPromoCompStartDtEndDtGrouping> divisionIdPromoCompStartDtEndDtGrouping;

private List<Step> dynamicSteps = Collections.synchronizedList(new ArrayList<Step>()) ;


@Bean
public Job exportMasterListCsvJob(
        @Qualifier("createJobDatesStep") Step createJobDatesStep,
        @Qualifier("createDynamicStepsStep") Step createDynamicStepsStep,
        @Qualifier("executeDynamicStepsStep") Step executeDynamicStepsStep) {

    return jobBuilderFactory.get(JOB_NAME)
            .flow(createJobDatesStep)
            .next(createDynamicStepsStep)
            .next(executeDynamicStepsStep)
            .end().build();
}   


@Bean
public Step executeDynamicStepsStep(
        @Qualifier("executeDynamicStepsTasklet")  Tasklet executeDynamicStepsTasklet) {

    return  stepBuilderFactory
                .get("executeDynamicStepsStep")
                .tasklet(executeDynamicStepsTasklet)
                .build();               
}

@Bean
public Tasklet executeDynamicStepsTasklet() {

    return new Tasklet() {

        @Override
        public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
            FlowStep flowStep = new FlowStep(createParallelFlow());
            SimpleJobBuilder jobBuilder = jobBuilderFactory.get("myNewJob").start(flowStep);
            return RepeatStatus.FINISHED;
        }
    };
}

public Flow createParallelFlow() {
    SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
    taskExecutor.setConcurrencyLimit(1); 

    List<Flow> flows = dynamicSteps.stream()
            .map(step -> new FlowBuilder<Flow>("flow_" + step.getName()).start(step).build())
            .collect(Collectors.toList());

    return new FlowBuilder<SimpleFlow>("parallelStepsFlow")
            .split(taskExecutor)
            .add(flows.toArray(new Flow[flows.size()]))
            .build();
}

@Bean

public Step createDynamicStepsStep(
        @Qualifier("createDynamicStepsTasklet")  Tasklet createDynamicStepsTasklet) {

    return  stepBuilderFactory
                .get("createDynamicStepsStep")
                .tasklet(createDynamicStepsTasklet)
                .build();               
}   

@Bean
@JobScope
public Tasklet createDynamicStepsTasklet() {

    return new Tasklet() {

        @Override
        public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
            for (DivisionIdPromoCompStartDtEndDtGrouping grp: divisionIdPromoCompStartDtEndDtGrouping){

                System.err.println("grp: " + grp);

                String stepName = "stp_" + grp;

                String fileName = grp + FlatFileConstants.EXTENSION_CSV;

                Step dynamicStep = 
                        stepBuilderFactory.get(stepName)
                        .<MasterList,MasterList> chunk(10)
                        .reader(queryStagingDbReader(
                                grp.getDivisionId(), 
                                grp.getRpmPromoCompDetailStartDate(), 
                                grp.getRpmPromoCompDetailEndDate()))
                        .writer(masterListFileWriter(fileName))                                
                        .build(); 

                dynamicSteps.add(dynamicStep);

            } 
            System.err.println("createDynamicStepsTasklet dynamicSteps: " + dynamicSteps);
            return RepeatStatus.FINISHED;
        }
    };
}


public FlatFileItemWriter<MasterList> masterListFileWriter(String fileName) {
    FlatFileItemWriter<MasterList> writer = new FlatFileItemWriter<>();
    writer.setResource(new FileSystemResource(new File(outDir, fileName )));
    writer.setHeaderCallback(masterListFlatFileHeaderCallback());
    writer.setLineAggregator(masterListFormatterLineAggregator());
    return writer;
}

所以现在我有一个需要执行的动态步骤列表,我相信它们在 StepScope 中。有人可以告诉我如何执行它们

【问题讨论】:

    标签: spring spring-batch


    【解决方案1】:

    这行不通。您的 Tasklet 只是创建一个以 FlowStep 作为第一步的作业。使用 jobBuilderfactory 只是创建作业。它不会启动它。方法名“start”可能会产生误导,因为它只定义了第一步。但它不会启动作业。

    一旦启动,您就无法更改作业的结构(其步骤和子步骤)。因此,不可能根据步骤 1 中计算的内容在步骤 2 中配置 flowstep。(当然,您可以在 springbatch 结构中更深入地进行一些黑客攻击并直接修改 bean 等等......但你没有不想那样做)。

    我建议您使用一种带有适当 postConstruct 方法的“SetupBean”,该方法被注入到配置您的工作的类中。这个“SetupBean”负责计算正在处理的对象列表。

    @Component
    public class SetUpBean {
    
      private List<Object> myObjects;
    
      @PostConstruct
      public afterPropertiesSet() {
        myObjects = ...;
      }
    
      public List<Object> getMyObjects() {
       return myObjects;
      }
    }
    
    @Configuration
    public class JobConfiguration {
    
       @Autowired
       private JobBuilderFactory jobBuilderFactory;
    
       @Autowired
       private StepBuilderFactory stepBuilderFactory;
    
       @Autowired
       private SetUpBean setup;
    
       ... 
    }
    

    【讨论】:

    • 谢谢。在那个 setupBean 中是否可以使用 JobScope 注释访问 jobParameters?
    • Job 没有运行,所以还没有 JobScope,因此 JobScope 注释将不起作用。您是否将 jobParameters 作为命令行参数提供?您是否正在使用 SpringBoot 启动您的工作?
    • 我是,但我的用户正在通过其他端点执行作业,因此命令行参数不是一个选项。我可以花点时间问你是否必须根据数据中的某个键将阅读器数据拆分为多个输出文件,你会怎么做?
    • 能否请您打开另一个问题以将输入拆分为多个输出文件,因为作为真正的答案而不是评论更容易回答。此外,您还可以考虑提出一个关于如何使用休息服务提供的参数来配置作业的新问题。只需在此处添加问题的链接作为评论,我们就会回答。
    • 谢谢。其实我已经有帖子了。 stackoverflow.com/questions/38871808/… 。那里的人提出了与您非常相似的建议。即有一个设置 bean(他们称之为 JobDatesCreator),但他们没有考虑到我在创建日期列表时访问该设置 bean 中的作业参数并使用作业参数作为输入的要求。在一个完美的世界里,我想实现那个人的回答,但正如你所指出的,我的要求是我认为不可能的
    猜你喜欢
    • 2016-01-03
    • 1970-01-01
    • 2016-10-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多