【发布时间】:2021-03-18 11:38:18
【问题描述】:
(注意这个问题可能与this question有关,但它的范围要小得多。)
我有这样定义的最简单的工作:
@Configuration
@EnableBatchProcessing
public class FileTransformerConfiguration {
private JobBuilderFactory jobBuilderFactory;
private StepBuilderFactory stepBuilderFactory;
@Autowired
public FileTransformerConfiguration(JobBuilderFactory jobBuilderFactory,
StepBuilderFactory stepBuilderFactory) {
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
}
@Bean
public Job transformJob() {
return this.jobBuilderFactory.get("transformJob").incrementer(new RunIdIncrementer())
.flow(transformStep()).end().build();
}
@Bean
public Step transformStep() {
return this.stepBuilderFactory.get("transformStep")
.<String, String>chunk(1).reader(new ItemReader())
.processor(processor())
.writer(new ItemWriter()).build();
}
@Bean
public ItemProcessor<String, String> processor() {
return item -> {
System.out.println("Converting item (" + item + ")...");
return item;
};
}
}
public class ItemReader implements ItemStreamReader<String> {
private Iterator<String> it;
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
this.it = Arrays.asList("A", "B", "C", "D", "E").iterator();
}
@Override
public String read() throws Exception {
return this.it.hasNext() ? this.it.next() : null;
}
@Override
public void close() throws ItemStreamException { }
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {}
}
@JobScope
public class ItemWriter implements ItemStreamWriter<String> {
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException { }
@Override
public void write(List<? extends String> items) throws Exception {
items.forEach(item -> System.out.println("Writing item: " + item));
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException { }
@Override
public void close() throws ItemStreamException { }
}
没有花哨的逻辑,只是通过管道移动字符串。
代码是这样调用的:
@SpringBootApplication
public class TestCmpsApplication {
}
@SpringBootTest(classes = {TestCmpsApplication.class})
public class FileTransformerImplIT {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job transformJob;
@Test
void test1() throws Exception {
String id = UUID.randomUUID().toString();
JobParametersBuilder jobParameters = new JobParametersBuilder();
jobParameters.addLong("PARAM_START_TIME", System.currentTimeMillis());
jobParameters.addString("PARAM_MAPPING_RULE_DEFINITION_ID", id, true);
this.jobLauncher.run(this.transformJob, jobParameters.toJobParameters());
}
@Test
void test2() throws Exception {
String id = UUID.randomUUID().toString();
JobParametersBuilder jobParameters = new JobParametersBuilder();
jobParameters.addLong("PARAM_START_TIME", System.currentTimeMillis());
jobParameters.addString("PARAM_MAPPING_RULE_DEFINITION_ID", id, true);
this.jobLauncher.run(this.transformJob, jobParameters.toJobParameters());
}
}
(注意需要两个测试,即使它们是相同的。第一个总是有效的。)
所以这很好用。但是,一旦我添加了这个:
@Bean
public Step transformStep() {
return this.stepBuilderFactory.get("transformStep")
.<String, String>chunk(1).reader(new ItemReader())
.processor(processor())
.writer(new ItemWriter())
.transactionAttribute(transactionAttribute()).build();
}
private TransactionAttribute transactionAttribute() {
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
attribute.setPropagationBehavior(Propagation.NEVER.value());
return attribute;
}
现在第二个测试失败了。测试本身说
TransactionSuspensionNotSupportedException: Transaction manager [org.springframework.batch.support.transaction.ResourcelessTransactionManager] does not support transaction suspension
虽然日志有助于提供此错误:
IllegalTransactionStateException: Existing transaction found for transaction marked with propagation 'never'
好的。我直接告诉工作永远不要使用交易,但不知何故,无论如何,有人创建了一个。所以让我们试试MANDATORY。现在测试有和上面一样的错误,日志现在说:
IllegalTransactionStateException: No existing transaction found for transaction marked with propagation 'mandatory'
不知何故,有人创建了一个事务,但不是针对所有两个工作?当然SUPPORTS 会起作用。不,那么测试将失败并出现相同的异常,并且日志将显示以下内容:
OptimisticLockingFailureException: Attempt to update step execution id=1 with wrong version (2), where current version is 3
我不知道发生了什么。显然有人在步骤之外创建了交易,但我不知道如何阻止它们。因为我宁愿没有交易。或者至少一个有效的事务管理是事务在连续调用两次时会以相同的方式工作。
我尝试了 Spring Batch 4.2、4.2.5、4.3 和 4.3.1。
我做错了什么?我怎样才能做到这一点?
【问题讨论】:
-
为什么要用空实现覆盖方法?
-
@silentsudo 这些不是接口吗?
-
Note there need to be two tests, even though they are identical:为什么?你有什么理由在作者上使用@JobScope?TaskletStep需要事务管理器。默认情况下,使用ResourcelessTransactionManager,它无论如何都不支持事务(正如您从日志中看到的Transaction manager [org.springframework.batch.support.transaction.ResourcelessTransactionManager] does not support transaction suspension。那么为什么要尝试使用Propagation.NEVER 提供TransactionAttributes?
标签: java spring spring-batch