【问题标题】:Spring-batch exiting with Exit Status : COMPLETED before actual job is finished?Spring-batch 以退出状态退出:在实际工作完成之前完成?
【发布时间】:2017-10-16 14:42:45
【问题描述】:

在我的 Spring Batch 应用程序中,我编写了一个 CustomItemWriter,它使用 DynamoDBAsyncClient 在内部将项目写入 DynamoDB,此客户端返回 Future 对象。我有一个包含数百万条记录的输入文件。由于 CustomItemWriter 立即返回未来对象,因此我的批处理作业在 5 秒内退出,状态为 COMPLETED,但实际上将所有项目写入数据库需要 3-4 分钟,我希望该批处理作业仅在所有项目写入数据库后完成。我该怎么做?

job 定义如下

    <bean id="report" class="com.solution.model.Report" scope="prototype" />
        <batch:job id="job" restartable="true">
            <batch:step id="step1">
                <batch:tasklet>
                    <batch:chunk reader="cvsFileItemReader"  processor="filterReportProcessor" writer="customItemWriter"
                        commit-interval="20">
                    </batch:chunk>
                </batch:tasklet>
            </batch:step>
        </batch:job>
<bean id="customItemWriter" class="com.solution.writer.CustomeWriter"></bean>

CustomeItemWriter 定义如下

public class CustomeWriter implements ItemWriter<Report>{
    public void write(List<? extends Report> item) throws Exception {
    List<Future<PutItemResult>> list = new LinkedList();
    AmazonDynamoDBAsyncClient client = new AmazonDynamoDBAsyncClient();
        for(Report report : item) {
            PutItemRequest req = new PutItemRequest();
            req.setTableName("MyTable");
            req.setReturnValue(ReturnValue.ALL_ODD);
            req.addItemEntry("customerId",new 
            AttributeValue(item.getCustomeId()));
            Future<PutItemResult> res = client.putItemAsync(req);
            list.add(res);
            }
    }

}

主类包含

JobExecution execution = jobLauncher.run(job, new JobParameters());
System.out.println("Exit Status : " + execution.getStatus());

由于在 ItemWriter 中它返回未来对象,它不会等待完成操作。从主要的角度来看,由于所有项目都已提交以编写批处理状态,因此显示为 COMPLETED 并且作业终止。 我希望只有在 DynamoDB 中执行实际写入后才能终止此作业。 我们可以有一些其他的步骤来等待这个或者一些 Listener 可用吗?

【问题讨论】:

  • 我们可以看看你的作家代码
  • 更新了代码。
  • Future 获取结果,该结果应该会阻止写入器,直到它实际完成。
  • 谢谢@M.Deinum

标签: spring amazon-dynamodb spring-batch


【解决方案1】:

这是一种方法。由于ItemWriter::write 不返回任何您可以使用监听器功能的东西。

@Component
@JobScope
public class YourWriteListener implements ItemWriteListener<WhatEverYourTypeIs> {


  @Value("#{jobExecution.executionContext}")
  private ExecutionContext executionContext;


  @Override
  public void afterWrite(final List<? extends WhatEverYourTypeIs> paramList) {
     Future future = this.executionContext.readAndValidate("FutureKey", Future.class);
     //wait till the job is done using future object
  }

  @Override
  public void beforeWrite(final List<? extends WhatEverYourTypeIs> paramList) {

  }

  @Override
  public void onWriteError(final Exception paramException, final List<? extends WhatEverYourTypeIs> paramList) {

  }
}

在您的编写器类中,除了将未来对象添加到 ExecutionContext 之外,一切都保持不变。

public class YourItemWriter extends ItemWriter<WhatEverYourTypeIs> {

  @Value("#{jobExecution.executionContext}")
  private ExecutionContext executionContext;

  @Override
  protected void doWrite(final List<? extends WhatEverYourTypeIs> youritems) 

     //write to DynamoDb and get Future object
    executionContext.put("FutureKey", future);
    }

  }

}

你可以在你的配置中注册监听器。这是一个java代码,你需要在你的xml中做同样的事情

@Bean
  public Step initStep() {

    return this.stepBuilders.get("someStepName").<YourTypeX, YourTypeY>chunk(10)
        .reader(yourReader).processor(yourProcessor)
        .writer(yourWriter).listener(YourWriteListener)
        .build();
  }

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-06-13
    • 1970-01-01
    • 1970-01-01
    • 2020-05-05
    • 1970-01-01
    • 2016-07-27
    • 1970-01-01
    相关资源
    最近更新 更多