【问题标题】:Spring Batch: pass data between reader and writerSpring Batch:在读写器之间传递数据
【发布时间】:2015-12-20 13:58:51
【问题描述】:

我想在我步骤的 Reader 中设置的 Writer 中获取数据。我通过http://docs.spring.io/spring-batch/trunk/reference/html/patterns.html#passingDataToFutureSteps了解 ExecutionContexts(步骤和作业)和 ExecutionContextPromotionListener

问题在于,在 Writer 中,我正在检索 'npag' 的空值。

ItemWriter 上的行:

LOG.info("INSIDE WRITE, NPAG: " + nPag);

我一直在做一些没有运气的变通办法,正在寻找其他similar questions的答案...有什么帮助吗?谢谢!

这是我的代码:

阅读器

@Component
public class LCItemReader implements ItemReader<String> {

private StepExecution stepExecution;

private int nPag = 1;

@Override
public String read() throws CustomItemReaderException {

    ExecutionContext stepContext = this.stepExecution.getExecutionContext();
    stepContext.put("npag", nPag);
    nPag++;
    return "content";
}

@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
    this.stepExecution = stepExecution;
}
}

作家

@Component
@StepScope
public class LCItemWriter implements ItemWriter<String> {

private String nPag;

@Override
public void write(List<? extends String> continguts) throws Exception {
    try {
        LOG.info("INSIDE WRITE, NPAG: " + nPag);
    } catch (Throwable ex) {
        LOG.error("Error: " + ex.getMessage());
    }
}

@BeforeStep
public void retrieveInterstepData(StepExecution stepExecution) {
    JobExecution jobExecution = stepExecution.getJobExecution();
    ExecutionContext jobContext = jobExecution.getExecutionContext();
    this.nPag = jobContext.get("npag").toString();
}
}

作业/步骤批量配置

@Bean
public Job lCJob() {
    return jobs.get("lCJob")
            .listener(jobListener)
            .start(lCStep())
            .build();
}

@Bean
public Step lCStep() {
    return steps.get("lCStep")
            .<String, String>chunk(1)
            .reader(lCItemReader)
            .processor(lCProcessor)
            .writer(lCItemWriter)
            .listener(promotionListener())
            .build();
}

听众

@Bean
public ExecutionContextPromotionListener promotionListener() {
    ExecutionContextPromotionListener executionContextPromotionListener = new ExecutionContextPromotionListener();
    executionContextPromotionListener.setKeys(new String[]{"npag"});
    return executionContextPromotionListener;
}

【问题讨论】:

  • 将步骤执行注入写入器并在 write() 期间读取 nPag

标签: spring spring-boot spring-batch


【解决方案1】:

ExecutionContextPromotionListener 特别声明它在步骤结束时工作,以便在编写器执行之后。所以我认为你指望的升职不会在你认为的时候发生。

如果我是你,我会在步骤上下文中设置它,如果你需要一步中的值,我会从步骤中获取它。否则我会将其设置为工作上下文。

另一方面是@BeforeStep。这标志着在步骤上下文存在之前执行的方法。您在阅读器中设置 nPag 值的方式是在步骤开始执行之后。

【讨论】:

    【解决方案2】:

    您甚至在读取器中设置 nPag 之前就尝试读取它的值,最终得到一个默认值为 null 的值。您需要在直接从执行上下文记录时读取 nPag 上的值。您可以保留对 jobContext 的引用。试试这个

    @Component
    @StepScope
    public class LCItemWriter implements ItemWriter<String> {
    
    private String nPag;
    private ExecutionContext jobContext;
    
    @Override
    public void write(List<? extends String> continguts) throws Exception {
        try {
            this.nPag = jobContext.get("npag").toString();
            LOG.info("INSIDE WRITE, NPAG: " + nPag);
        } catch (Throwable ex) {
            LOG.error("Error: " + ex.getMessage());
        }
    }
    
    @BeforeStep
    public void retrieveInterstepData(StepExecution stepExecution) {
        JobExecution jobExecution = stepExecution.getJobExecution();
        jobContext = jobExecution.getExecutionContext();
    
    }
    }
    

    【讨论】:

      【解决方案3】:

      在您的 Reader 和 Writer 中,您需要实现 ItemStream 接口并使用 ExecutionContext 作为成员变量。这里我给出了处理器而不是 Writer 的示例,但同样适用于 Writer。它对我来说工作正常,我能够将值从读取器传递到处理器。

      我已经在阅读器的上下文中设置了值,并在处理器中获取了值。

      public class EmployeeItemReader implements ItemReader<Employee>, ItemStream {
      
          ExecutionContext context;
          @Override
          public Employee read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
              context.put("ajay", "i am going well");
              Employee emp=new Employee();
              emp.setEmpId(1);
              emp.setFirstName("ajay");
              emp.setLastName("goswami");
              return emp;
          }
          @Override
          public void close() throws ItemStreamException {
              // TODO Auto-generated method stub
          }
      
          @Override
          public void open(ExecutionContext arg0) throws ItemStreamException {
              context = arg0;
          }
      
          @Override
          public void update(ExecutionContext arg0) throws ItemStreamException {
              // TODO Auto-generated method stub
              context = arg0;
          }
      
      }
      

      我的处理器

      public class CustomItemProcessor implements ItemProcessor<Employee,ActiveEmployee>,ItemStream{
          ExecutionContext context;
          @Override
          public ActiveEmployee process(Employee emp) throws Exception {
              //See this line
              System.out.println(context.get("ajay"));
      
              ActiveEmployee actEmp=new ActiveEmployee();
              actEmp.setEmpId(emp.getEmpId());
              actEmp.setFirstName(emp.getFirstName());
              actEmp.setLastName(emp.getLastName());
              actEmp.setAdditionalInfo("Employee is processed");
              return actEmp;
          }
          @Override
          public void close() throws ItemStreamException {
              // TODO Auto-generated method stub
      
          }
          @Override
          public void open(ExecutionContext arg0) throws ItemStreamException {
              // TODO Auto-generated method stub
      
          }
          @Override
          public void update(ExecutionContext arg0) throws ItemStreamException {
              context = arg0;
      
          }
      
      }
      

      希望这会有所帮助。

      【讨论】:

        猜你喜欢
        • 2018-06-23
        • 1970-01-01
        • 1970-01-01
        • 2019-05-24
        • 1970-01-01
        • 1970-01-01
        • 2019-08-01
        • 2023-03-25
        • 1970-01-01
        相关资源
        最近更新 更多