【发布时间】:2021-07-01 22:41:00
【问题描述】:
更新:我正在重新运行我的代码,实际上发现我的所有侦听器都没有真正工作......
我有一个 Spring Batch 应用程序,我正在覆盖 StepExecutionListener 并提供我自己的实现。我正在使用 TaskletStep 注册它,但是,我从来没有看到 beforeStep/afterStep 方法应该输出的日志消息:
MyStepExecutionListener.java
public class MyStepExecutionListener implements StepExecutionListener {
@Override
public void beforeStep(StepExecution stepExecution) {
// begin my own custom implementation
LOGGER.info("Before the step!");
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
// begin my own custom implementation
LOGGER.info("After the step!");
return stepExecution.getStatus();
}
}
我在 BatchConfig.java 类中将我的 Tasklet 步骤定义为:
public BatchConfig {
@Bean
public static org.springframework.batch.core.scope.JobScope jobScope() {
org.springframework.batch.core.scope.JobScope jobScope = new org.springframework.batch.core.scope.JobScope();
jobScope.setProxyTargetClass(true);
jobScope.setAutoProxy(true);
return jobScope;
}
@Bean
public static org.springframework.batch.core.scope.StepScope stepScope() {
org.springframework.batch.core.scope.StepScope stepScope = new org.springframework.batch.core.scope.StepScope();
stepScope.setProxyTargetClass(true);
stepScope.setAutoProxy(true);
return stepScope;
}
@Bean
// @StepScope
public StepExecutionListener stepExecutionListener() {
return new MyStepExecutionListener();
}
@Bean
@Qualifier("s3FlatfFileReaderForMktgOffrs")
@StepScope
public S3FlatFileItemReader<FieldSet> s3FlatfFileReaderForMktgOffrs() {
return new S3FlatFileItemReader<>(lineMapper());
}
@Bean
@Qualifier("s3FlatfFileReaderCustom")
@StepScope
public S3FlatFileItemReader<FieldSet> s3FlatfFileReaderCustom() {
// Custom class that Extends FlatFileItemReader
return new S3FlatFileItemReader<>(lineMapper());
}
@Bean
@Qualifier("myCustomFileItemReader")
@StepScope
public ItemStreamReader<List<FieldSet>> myCustomFileItemReader(
@Value("#{jobParameters}") Map jobParameters) {
String fileName = (String) jobParameters.get("fileName");
String region = (String) jobParameters.get("region");
String bucketName = awsS3EastBucket;
if (StringUtils.equals(region, Regions.US_WEST_2.getName())) {
bucketName = awsS3WestBucket;
}
// Custom class that Extends FlatFileItemReader
S3FlatFileItemReader<FieldSet> s3FileItemReader = s3FlatfFileReaderCustom();
s3FileItemReader.setResource(S3_PROTOCOL_PREFIX + bucketName + SLASH + fileName);
}
s3FileItemReader.setStrict(false);
s3FileItemReader.setLinesToSkip(1);
s3FileItemReader.setSaveState(false);
AggregateItemReader aggregateItemReader = new AggregateItemReader(s3FileItemReader) {
@Override
protected String getItemKey(FieldSet item) {
return item.readString(FIRST_NAME) + "-" +
item.readString(LAST_NAME);
}
};
SynchronizedItemStreamReader<List<FieldSet>> fieldSetSynchronizedItemStreamReader = new SynchronizedItemStreamReader<>();
fieldSetSynchronizedItemStreamReader.setDelegate(aggregateItemReader);
return fieldSetSynchronizedItemStreamReader;
}
@Bean(name = "myCustomStep")
@Scope("prototype")
@SuppressWarnings("unchecked")
public Step myCustomStep(PlatformTransactionManager transactionManager) {
TaskletStep step = stepBuilderFactory.get("myCustomStep")
.<List<FieldSet>, List<MyPayLoadRecord>>chunk(250)
.reader(myCustomFileItemReader(OVERRIDDEN_BY_EXPRESSION))
.processor(myCustomProcessor())
.writer(myCustomWriter())
.faultTolerant()
.skipPolicy(new AlwaysSkipItemSkipPolicy())
.skip(DataValidationException.class)
.listener(stepExecutionListener())
.listener(new CustomReaderListener())
.listener(new CustomProcessListener())
.listener(new CustomWriteListener())
.listener(new CustomSkipListener())
.taskExecutor(batchTaskExecutor())
.throttleLimit(maxThreads)
.build();
step.setTransactionManager(transactionManager);
//step.registerStepExecutionListener(stepExecutionListener());
step.registerChunkListener(new CustomChunkListener());
return step;
}
}
我已经注释掉了step.registerStepExecutionListener(stepExecutionListener()); 并尝试设置监听器,如上所示,但两种实现都不起作用。我的印象是我应该只实现 StepExecutionListener 然后用 TaskletStep 注册它 - 我在这里遗漏了什么吗?
【问题讨论】:
-
stepExecutionListener()是如何定义的? -
@RoddyoftheFrozenPeas 我现在在上面添加了 Bean 定义!
-
你如何验证监听器“工作”?调试会话中的断点,或者只是检查日志? (如果是第二个,您是否确认您的日志记录级别配置正确?最简单的方法是在 MyStepExecutionListener 的构造函数中放置一个日志记录语句)
-
你的
afterStep方法也缺少它的返回语句,所以它甚至不应该被编译。 -
一个步骤应该有@JobScope,我不明白你为什么要使用@Scope("prototype")?
标签: java spring spring-batch