ItemProcessor

      在开发过程中,我们经常需要读取数据后,经过一系列业务逻辑的操作,进而写入数据到指定持久化过程。Spring Batch为我们提供了ItemProcessor接口进行数据处理。

      我们可以构建CompositeItemProcessor 的Bean,存储多个Processor,再与Step绑定;

  @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private DataSource dataSource;

    @Autowired
    private ItemProcessor<Customer, Customer> firstNameUpperCaseProcessor;

    @Autowired
    private ItemProcessor<Customer, Customer> idFilterProcessor;

    @Bean
    public Job processorDemoJob() throws Exception {
        return jobBuilderFactory.get("processorDemoJob")
                .start(processorDemoStep())
                .build();

    }

    @Bean
    public Step processorDemoStep() throws Exception {
        return stepBuilderFactory.get("processorDemoStep")
                .<Customer,Customer>chunk(100)
                .reader(dbJdbcDemoReader())
                .processor(processorDemoProcessor())
                .writer(flatFileDemoFlatFileWriter())
                .build();
    }

    @Bean
    public CompositeItemProcessor<Customer,Customer> processorDemoProcessor(){
        CompositeItemProcessor<Customer,Customer> processor = new CompositeItemProcessor<>();

        List<ItemProcessor<Customer,Customer>> list = new ArrayList<>();
        list.add(firstNameUpperCaseProcessor);
        list.add(idFilterProcessor);
        processor.setDelegates(list);
        
        return processor;
    }

    @Bean
    @StepScope
    public JdbcPagingItemReader<Customer> dbJdbcDemoReader() {
        JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();

        reader.setDataSource(this.dataSource);
        reader.setFetchSize(100);
        reader.setRowMapper((rs,rowNum)->{
            return Customer.builder().id(rs.getLong("id"))
                    .firstName(rs.getString("firstName"))
                    .lastName(rs.getString("lastName"))
                    .birthdate(rs.getString("birthdate"))
                    .build();

        });

        MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
        queryProvider.setSelectClause("id, firstName, lastName, birthdate");
        queryProvider.setFromClause("from Customer");
        Map<String, Order> sortKeys = new HashMap<>(1);
        sortKeys.put("id", Order.ASCENDING);
        queryProvider.setSortKeys(sortKeys);

        reader.setQueryProvider(queryProvider);

        return reader;

    }

    @Bean
    public FlatFileItemWriter<Customer> flatFileDemoFlatFileWriter() throws Exception {
        FlatFileItemWriter<Customer> itemWriter = new FlatFileItemWriter<>();
        String path = File.createTempFile("customerInfo",".data").getAbsolutePath();
        System.out.println(">> file is created in: " + path);
        itemWriter.setResource(new FileSystemResource(path));

        itemWriter.setLineAggregator(new MyCustomerLineAggregator());
        itemWriter.afterPropertiesSet();

        return itemWriter;

    }

       具体ItemProcessor实现

@Component
public class FirstNameUpperCaseProcessor implements ItemProcessor<Customer,Customer> {
    @Override
    public Customer process(Customer item) throws Exception {
        return new Customer(item.getId(),item.getFirstName().toUpperCase(),item.getLastName(),
                item.getBirthdate());
    }
}

 

@Component
public class IdFilterProcessor implements ItemProcessor<Customer,Customer> {
    @Override
    public Customer process(Customer item) throws Exception {
        if (item.getId() % 2 == 0){
            return item;
        } else {
            return null;
        }
    }
}

 

 

StepListener

     Spring Batch 提供许多监听,针对每个组件选择不同的监听器,对业务流程进行切面处理。

     在源码 org.springframework.batch.core 下, 我们不难发现所有监听器的父类是  StepListener。同包下,子类有ItemProcessListener、ItemReadListener、ItemWriteListener等,这边不一一举例。 我们可以根据具体业务处理,选择相对应的listerner即可。

Spring Batch 之 ItemProcessor 和 StepListener

    具体使用,参考如下:

    

@Configuration
public class SkipListenerDemoJobConfiguration {


    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Autowired
    private ItemWriter<String> skipItemWriter;

    @Autowired
    private ItemProcessor<String, String> skipItemProcessor;

    @Autowired
    private MySkipListener mySkipListener;

    @Bean
    @StepScope
    public ListItemReader reader() {

        List<String> items = new ArrayList<>();

        for(int i = 0; i < 50; i++) {
            items.add(String.valueOf(i));
        }

        ListItemReader<String> reader = new ListItemReader(items);

        return reader;
    }





    @Bean
    public Step skipListenerDemoStep1() {
        return stepBuilderFactory.get("skipListenerDemoStep1")
                .<String,String>chunk(10)
                .reader(reader())
                .processor(skipItemProcessor)
                .writer(skipItemWriter)
                .faultTolerant()
                .skip(CustomRetryableException.class)
                .skipLimit(10)
                .listener(mySkipListener)
                .build();
    }

    @Bean
    public Job skipListenerDemoJob1() {
        return jobBuilderFactory.get("skipListenerDemoJob1")
                .start(skipListenerDemoStep1())
                .build();
    }
}
@Component
public class MySkipListener implements SkipListener<String, String> {
    @Override
    public void onSkipInRead(Throwable t) {

    }

    @Override
    public void onSkipInWrite(String item, Throwable t) {

    }

    @Override
    public void onSkipInProcess(String item, Throwable t) {
        System.out.println(item + " got exception:" + t);
    }
}
@Component
public class SkipItemProcessor implements ItemProcessor<String, String> {
	private int  attemptCount = 0;

	@Override
	public String process(String item) throws Exception {
		System.out.println("processing item " + item);
		if(item.equalsIgnoreCase("23")) {
			throw new CustomRetryableException("Process failed of item 23");
		}
		else {
			return String.valueOf(Integer.valueOf(item) * -1);
		}
	}

}
@Component
public class SkipItemWriter implements ItemWriter<String> {


	@Override
	public void write(List<? extends String> items) throws Exception {
		for (String item:items)
		System.out.println(item);
	}

}

 

相关文章:

  • 2021-06-23
  • 2021-05-31
  • 2021-11-07
  • 2021-09-12
  • 2022-02-16
  • 2021-04-22
猜你喜欢
  • 2022-12-23
  • 2021-09-06
  • 2021-06-16
  • 2021-11-05
  • 2021-06-18
  • 2022-01-11
相关资源
相似解决方案