ItemProcessor
在开发过程中,我们经常需要读取数据后,经过一系列业务逻辑的操作,进而写入数据到指定持久化过程。Spring Batch为我们提供了ItemProcessor接口进行数据处理。
我们可以构建CompositeItemProcessor 的Bean,存储多个Processor,再与Step绑定;
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
@Autowired
private ItemProcessor<Customer, Customer> firstNameUpperCaseProcessor;
@Autowired
private ItemProcessor<Customer, Customer> idFilterProcessor;
@Bean
public Job processorDemoJob() throws Exception {
return jobBuilderFactory.get("processorDemoJob")
.start(processorDemoStep())
.build();
}
@Bean
public Step processorDemoStep() throws Exception {
return stepBuilderFactory.get("processorDemoStep")
.<Customer,Customer>chunk(100)
.reader(dbJdbcDemoReader())
.processor(processorDemoProcessor())
.writer(flatFileDemoFlatFileWriter())
.build();
}
@Bean
public CompositeItemProcessor<Customer,Customer> processorDemoProcessor(){
CompositeItemProcessor<Customer,Customer> processor = new CompositeItemProcessor<>();
List<ItemProcessor<Customer,Customer>> list = new ArrayList<>();
list.add(firstNameUpperCaseProcessor);
list.add(idFilterProcessor);
processor.setDelegates(list);
return processor;
}
@Bean
@StepScope
public JdbcPagingItemReader<Customer> dbJdbcDemoReader() {
JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
reader.setDataSource(this.dataSource);
reader.setFetchSize(100);
reader.setRowMapper((rs,rowNum)->{
return Customer.builder().id(rs.getLong("id"))
.firstName(rs.getString("firstName"))
.lastName(rs.getString("lastName"))
.birthdate(rs.getString("birthdate"))
.build();
});
MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("id, firstName, lastName, birthdate");
queryProvider.setFromClause("from Customer");
Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);
reader.setQueryProvider(queryProvider);
return reader;
}
@Bean
public FlatFileItemWriter<Customer> flatFileDemoFlatFileWriter() throws Exception {
FlatFileItemWriter<Customer> itemWriter = new FlatFileItemWriter<>();
String path = File.createTempFile("customerInfo",".data").getAbsolutePath();
System.out.println(">> file is created in: " + path);
itemWriter.setResource(new FileSystemResource(path));
itemWriter.setLineAggregator(new MyCustomerLineAggregator());
itemWriter.afterPropertiesSet();
return itemWriter;
}
具体ItemProcessor实现
@Component
public class FirstNameUpperCaseProcessor implements ItemProcessor<Customer,Customer> {
@Override
public Customer process(Customer item) throws Exception {
return new Customer(item.getId(),item.getFirstName().toUpperCase(),item.getLastName(),
item.getBirthdate());
}
}
@Component
public class IdFilterProcessor implements ItemProcessor<Customer,Customer> {
@Override
public Customer process(Customer item) throws Exception {
if (item.getId() % 2 == 0){
return item;
} else {
return null;
}
}
}
StepListener
Spring Batch 提供许多监听,针对每个组件选择不同的监听器,对业务流程进行切面处理。
在源码 org.springframework.batch.core 下, 我们不难发现所有监听器的父类是 StepListener。同包下,子类有ItemProcessListener、ItemReadListener、ItemWriteListener等,这边不一一举例。 我们可以根据具体业务处理,选择相对应的listerner即可。
具体使用,参考如下:
@Configuration
public class SkipListenerDemoJobConfiguration {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
private ItemWriter<String> skipItemWriter;
@Autowired
private ItemProcessor<String, String> skipItemProcessor;
@Autowired
private MySkipListener mySkipListener;
@Bean
@StepScope
public ListItemReader reader() {
List<String> items = new ArrayList<>();
for(int i = 0; i < 50; i++) {
items.add(String.valueOf(i));
}
ListItemReader<String> reader = new ListItemReader(items);
return reader;
}
@Bean
public Step skipListenerDemoStep1() {
return stepBuilderFactory.get("skipListenerDemoStep1")
.<String,String>chunk(10)
.reader(reader())
.processor(skipItemProcessor)
.writer(skipItemWriter)
.faultTolerant()
.skip(CustomRetryableException.class)
.skipLimit(10)
.listener(mySkipListener)
.build();
}
@Bean
public Job skipListenerDemoJob1() {
return jobBuilderFactory.get("skipListenerDemoJob1")
.start(skipListenerDemoStep1())
.build();
}
}
@Component
public class MySkipListener implements SkipListener<String, String> {
@Override
public void onSkipInRead(Throwable t) {
}
@Override
public void onSkipInWrite(String item, Throwable t) {
}
@Override
public void onSkipInProcess(String item, Throwable t) {
System.out.println(item + " got exception:" + t);
}
}
@Component
public class SkipItemProcessor implements ItemProcessor<String, String> {
private int attemptCount = 0;
@Override
public String process(String item) throws Exception {
System.out.println("processing item " + item);
if(item.equalsIgnoreCase("23")) {
throw new CustomRetryableException("Process failed of item 23");
}
else {
return String.valueOf(Integer.valueOf(item) * -1);
}
}
}
@Component
public class SkipItemWriter implements ItemWriter<String> {
@Override
public void write(List<? extends String> items) throws Exception {
for (String item:items)
System.out.println(item);
}
}