【问题标题】:How can we share data between the different steps of a Job in Spring Batch?我们如何在 Spring Batch 中 Job 的不同步骤之间共享数据?
【发布时间】:2023-03-09 09:10:01
【问题描述】:

深入研究 Spring Batch,我想知道我们如何在 Job 的不同步骤之间共享数据?

我们可以为此使用 JobRepository 吗?如果是,我们该怎么做?

还有其他方法可以做到/实现相同的目标吗?

【问题讨论】:

    标签: spring-batch


    【解决方案1】:

    从一个步骤,您可以将数据放入StepExecutionContext。 然后,通过监听器,您可以将数据从 StepExecutionContext 提升到 JobExecutionContext

    JobExecutionContext 可在以下所有步骤中使用。

    小心:数据必须很短。 这些上下文通过序列化保存在JobRepository 中,并且长度是有限的(如果我没记错的话是 2500 个字符)。

    因此,这些上下文非常适合共享字符串或简单值,但不适用于共享集合或大量数据。

    共享大量数据并不是 Spring Batch 的理念。 Spring Batch 是一组不同的操作,而不是一个巨大的业务处理单元。

    【讨论】:

    • 您将如何共享潜在的大数据,例如在集合中?我的 itemProcessor 生成一个列表(要删除的记录),我需要将该列表向下传递给 tasklet 以进行处理(实际删除记录)。谢谢
    • 在这种情况下,工作范围会有所帮助吗?
    • @MichoRizo 如果列表很大,我建议使用像 redis/ecache 这样的缓存。我喜欢将上下文中的对象保持在相对较小的尺寸
    【解决方案2】:

    作业存储库间接用于在步骤之间传递数据(Jean-Philippe 说得对,最好的方法是将数据放入StepExecutionContext,然后使用详细命名的ExecutionContextPromotionListener 来促进步骤执行JobExecutionContext 的上下文键。

    请注意,还有一个侦听器用于将 JobParameter 键提升为 StepExecutionContext(更冗长的名称为 JobParameterExecutionContextCopyListener);如果你的工作步骤不是完全独立的,你会发现你经常使用这些。

    否则,您将使用更复杂的方案在步骤之间传递数据,例如 JMS 队列或(天堂禁止)硬编码文件位置。

    至于在上下文中传递的数据大小,我也建议您保持较小(但我没有任何细节

    【讨论】:

    【解决方案3】:

    您可以使用 Java Bean 对象

    1. 执行一步
    2. 将结果存储在 Java 对象中
    3. 下一步将引用相同的 java 对象以获取步骤 1 存储的结果

    通过这种方式,您可以根据需要存储大量数据

    【讨论】:

    • 下一步我将如何从第一步中获取对象。问题的重点是
    • @Elbek 自动装配它。第一步中的类自动装配了 POJO 并设置了数据,第二步中的类也具有自动装配的相同对象(除非您正在执行远程分区,否则应该是相同的实例)并使用 getter。
    • 您是如何在第 2 步的第 1 步中自动装配新创建的实例?您如何将新实例附加到 spring 上下文中?
    • @组件为POJO,第一步为@Autowired + Setters,后续为@Autowired + Getters。也可以在 Tasklets 中使用 JobScope 注释。
    【解决方案4】:

    这是我为保存可通过所有步骤访问的对象所做的操作。

    1. 创建了一个侦听器,用于在作业上下文中设置对象
    @Component("myJobListener")
    public class MyJobListener implements JobExecutionListener {
    
        public void beforeJob(JobExecution jobExecution) {
    
            String myValue = someService.getValue();
            jobExecution.getExecutionContext().putString("MY_VALUE", myValue);
        }
    }
    
    1. 在作业上下文中定义了监听器
    <listeners>
             <listener ref="myJobListener"/>
    </listeners>
    
    1. 使用 BeforeStep 注释在步骤中消耗值
    @BeforeStep
    public void initializeValues(StepExecution stepExecution) {
    
    String value = stepExecution.getJobExecution().getExecutionContext().getString("MY_VALUE");
    
    }
    

    【讨论】:

      【解决方案5】:

      我会说你有 3 个选择:

      1. 使用StepContext 并将其升级为JobContext 并且您可以从每个步骤访问它,您必须按照说明遵守大小限制
      2. 创建@JobScope bean 并向该bean 添加数据,@Autowire 在需要的地方使用它(缺点是它是内存结构,如果作业失败数据丢失,可能会导致可重启性问题)李>
      3. 我们需要跨步骤处理更大的数据集(读取 csv 中的每一行并写入 DB、从 DB 读取、聚合并发送到 API),因此我们决定在与 Spring Batch Meta 相同的 DB 中对新表中的数据进行建模表,将ids 保留在JobContext 中,并在需要时访问并在作业成功完成时删除该临时表。

      【讨论】:

      • 关于您的 2 选项。我可以通过这种方式从 writer 类访问 reader 类的 bean 集吗?
      • 从读者那里设置是什么意思?我们在 i 配置之外创建了 bean,并在需要的地方注入了它。您可以尝试了解如何将某些内容从读者推广到工作范围,但在我看来,这是在阅读器中定义具有工作范围的内容的奇怪解决方案。
      • 在这种情况下,工作范围会有所帮助吗?
      【解决方案6】:

      我被分配了一项任务来逐个调用批处理作业。每个作业都依赖于另一个作业。第一个作业结果需要执行后续作业程序。 我正在搜索如何在作业执行后传递数据。我发现这个 ExecutionContextPromotionListener 派上用场了。

      1) 我为“ExecutionContextPromotionListener”添加了一个 bean,如下所示

      @Bean
      public ExecutionContextPromotionListener promotionListener()
      {
          ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
          listener.setKeys( new String[] { "entityRef" } );
          return listener;
      }
      

      2) 然后我将其中一个侦听器附加到我的 Steps

      Step step = builder.faultTolerant()
                  .skipPolicy( policy )
                  .listener( writer )
                  .listener( promotionListener() )
                  .listener( skiplistener )
                  .stream( skiplistener )
                  .build();
      

      3) 我在我的 Writer 步骤实现中添加了 stepExecution 作为参考,并在 Beforestep 中填充

      @BeforeStep
      public void saveStepExecution( StepExecution stepExecution )
      {
          this.stepExecution = stepExecution;
      }   
      

      4) 在编写器步骤结束时,我将步骤执行中的值填充为如下所示的键

      lStepContext.put( "entityRef", lMap );
      

      5) 作业执行后,我从 lExecution.getExecutionContext() 并作为工作回复填充。

      6) 从作业响应对象中,我将获取值并在其余作业中填充所需的值。

      以上代码用于使用 ExecutionContextPromotionListener 将步骤中的数据提升到 ExecutionContext。 它可以在任何步骤中完成。

      【讨论】:

        【解决方案7】:

        使用ExecutionContextPromotionListener:

        public class YourItemWriter implements ItemWriter<Object> {
            private StepExecution stepExecution;
            public void write(List<? extends Object> items) throws Exception {
                // Some Business Logic
        
                // put your data into stepexecution context
                ExecutionContext stepContext = this.stepExecution.getExecutionContext();
                stepContext.put("someKey", someObject);
            }
            @BeforeStep
            public void saveStepExecution(Final StepExecution stepExecution) {
                this.stepExecution = stepExecution;
            }
        }
        

        现在您需要将 PromotionListener 添加到您的工作中

        @Bean
        public Step step1() {
                return stepBuilder
                .get("step1")<Company,Company>  chunk(10)
                .reader(reader()).processor(processor()).writer(writer())
                .listener(promotionListener()).build();
        }
        
        @Bean
        public ExecutionContextPromotionListener promotionListener() {
            ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
            listener.setKeys(new String[] {"someKey"});
            listener.setStrict(true);
            return listener;
        }
        

        现在,在第 2 步中,从作业 ExecutionContext 获取数据

        public class RetrievingItemWriter implements ItemWriter<Object> {
            private Object someObject;
            public void write(List<? extends Object> items) throws Exception {
                // ...
            }
            @BeforeStep
            public void retrieveInterstepData(StepExecution stepExecution) {
                JobExecution jobExecution = stepExecution.getJobExecution();
                ExecutionContext jobContext = jobExecution.getExecutionContext();
                this.someObject = jobContext.get("someKey");
            }
        }
        

        如果您正在使用 tasklet,则使用以下内容获取或放置 ExecutionContext

        List<YourObject> yourObjects = (List<YourObject>) chunkContent.getStepContext().getJobExecutionContext().get("someKey");
        

        【讨论】:

        • 从官方文档中复制粘贴代码很容易。为什么你不提供你自己的实现?每个人都知道它是用文档写的。
        • 我就是这么做的。我提供了易于理解的部分代码。而且,文档中是否提供相同的内容?我不知道。
        【解决方案8】:

        您可以将数据存储在简单对象中。喜欢:

        AnyObject yourObject = new AnyObject();
        
        public Job build(Step step1, Step step2) {
            return jobBuilderFactory.get("jobName")
                    .incrementer(new RunIdIncrementer())
                    .start(step1)
                    .next(step2)
                    .build();
        }
        
        public Step step1() {
            return stepBuilderFactory.get("step1Name")
                    .<Some, Any> chunk(someInteger1)
                    .reader(itemReader1())
                    .processor(itemProcessor1())
                    .writer(itemWriter1(yourObject))
                    .build();
        }
        
        public Step step2() {
            return stepBuilderFactory.get("step2Name")
                    .<Some, Any> chunk(someInteger2)
                    .reader(itemReader2())
                    .processor(itemProcessor2(yourObject))
                    .writer(itemWriter2())
                    .build();
        }
        

        只需将数据添加到写入器或任何其他方法中的对象,然后在下一步的任何阶段获取它

        【讨论】:

          【解决方案9】:

          正如 Nenad Bozic 在他的第三个选项中所说,使用临时表在步骤之间共享数据,使用上下文共享也做同样的事情,它写入表并在下一步加载,但如果你写入临时表,你可以在工作结束时进行清洁。

          【讨论】:

            【解决方案10】:

            另一种非常简单的方法,留在这里以备将来参考:

            class MyTasklet implements Tasklet {
                @Override
                public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) {
                    getExecutionContext.put("foo", "bar");
                }
            }
            

            class MyOtherTasklet implements Tasklet {
                @Override
                public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) {
                    getExecutionContext.get("foo");
                }   
            }
            

            getExecutionContext 这里是:

            ExecutionContext getExecutionContext(ChunkContext chunkContext) {
                return chunkContext.getStepContext()
                                   .getStepExecution()
                                   .getJobExecution()
                                   .getExecutionContext();
            }     
            

            把它放在一个超类中,作为default方法的接口,或者简单地粘贴到你的Tasklets中。

            【讨论】:

              【解决方案11】:

              Spring Batch 为自己创建元数据表(如batch_job_executionbatch_job_execution_contextbatch_step_instance 等)。

              我已经测试(使用 postgres DB)你可以在一列中至少有 51,428 个字符的数据(batch_job_execution_context.serialized_content)。它可能更多,这就是我测试的程度。

              当您在步骤中使用 Tasklet(如 class MyTasklet implements Tasklet)并覆盖其中的 RepeatStatus 方法时,您可以立即访问 ChunkContext

              class MyTasklet implements Tasklet {
              
                  @Override
                  public RepeatStatus execute(@NonNull StepContribution contribution, 
                                              @NonNull ChunkContext chunkContext) {
                      List<MyObject> myObjects = getObjectsFromSomewhereAndUseThemInNextStep();
                      chunkContext.getStepContext().getStepExecution()
                      .getJobExecution()
                      .getExecutionContext()
                      .put("mydatakey", myObjects);
                  }
              }
              

              现在您有了另一个步骤,可以使用不同的 Tasklet 访问这些对象

              class MyOtherTasklet implements Tasklet {
              
                  @Override
                  public RepeatStatus execute(@NonNull StepContribution contribution, 
                                              @NonNull ChunkContext chunkContext) {
                      List<MyObject> myObjects = (List<MyObject>) 
                      chunkContext.getStepContext().getStepExecution()
                      .getJobExecution()
                      .getExecutionContext()
                      .get("mydatakey"); 
                  }
              }
              

              或者,如果您没有 Tasklet 并且有类似 Reader/Writer/Processor,那么

              class MyReader implements ItemReader<MyObject> {
              
                  @Value("#{jobExecutionContext['mydatakey']}")
                  List<MyObject> myObjects;
                  // And now myObjects are available in here
              
                  @Override
                  public MyObject read() throws Exception {
              
                  }
              }
              

              【讨论】:

                【解决方案12】:

                使用Tasklets 的简单解决方案。无需访问执行上下文。我使用地图作为数据元素来移动。 (Kotlin 代码。)

                小任务

                class MyTasklet : Tasklet {
                
                    lateinit var myMap: MutableMap<String, String>
                
                    override fun execute(contribution: StepContribution, chunkContext: ChunkContext): RepeatStatus? {
                        myMap.put("key", "some value")
                        return RepeatStatus.FINISHED
                    }
                
                }
                

                批量配置

                @Configuration
                @EnableBatchProcessing
                class BatchConfiguration {
                
                    @Autowired
                    lateinit var jobBuilderFactory: JobBuilderFactory
                
                    @Autowired
                    lateinit var stepBuilderFactory: StepBuilderFactory
                
                    var myMap: MutableMap<String, String> = mutableMapOf()
                
                    @Bean
                    fun jobSincAdUsuario(): Job {
                        return jobBuilderFactory
                                .get("my-SO-job")
                                .incrementer(RunIdIncrementer())
                                .start(stepMyStep())    
                                .next(stepMyOtherStep())        
                                .build()
                    }
                
                    @Bean
                    fun stepMyStep() = stepBuilderFactory.get("MyTaskletStep")        
                        .tasklet(myTaskletAsBean())
                        .build()
                
                    @Bean
                    fun myTaskletAsBean(): MyTasklet {
                        val tasklet = MyTasklet()
                        tasklet.myMap = myMap      // collection gets visible in the tasklet
                        return tasklet
                    }
                }
                

                然后在MyOtherStep 中,您可以复制在MyStep 中看到的相同习语。这个其他 Tasklet 将看到在 MyStep 中创建的数据。

                重要

                • tasklet 是通过@Bean fun 创建的,因此它们可以使用@Autowired (full explanation)。
                • 为了实现更健壮的实现,tasklet 应使用 InitializingBean 实现
                覆盖乐趣 afterPropertiesSet() { Assert.notNull(myMap, "myMap 必须在调用 tasklet 之前设置") }

                【讨论】:

                  猜你喜欢
                  • 1970-01-01
                  • 1970-01-01
                  • 1970-01-01
                  • 1970-01-01
                  • 2015-08-26
                  • 2018-09-24
                  • 1970-01-01
                  • 2019-06-17
                  • 1970-01-01
                  相关资源
                  最近更新 更多