【问题标题】:Executing query inside spring batch processor在spring批处理器中执行查询
【发布时间】:2021-11-27 10:12:36
【问题描述】:

在我的项目中,我需要在 Spring Batch 处理器中执行查询以验证某些字段。

我该怎么做?

编辑:添加来源:

这是步骤的定义:

@Bean
public Step step1(JdbcBatchItemWriter<CaricoDTO> step1Writer) {
    return stepBuilderFactory.get("step1").<CaricoDTO, CaricoDTO>chunk(10).reader(multiResourceItemReader())
            .processor(processorStep1()).writer(step1Writer).build();

}

这是multiResourceItemReader的定义:

@Bean
public MultiResourceItemReader<CaricoDTO> multiResourceItemReader() {
    MultiResourceItemReader<CaricoDTO> resourceItemReader = new MultiResourceItemReader<CaricoDTO>();
    ArrayList<Integer> indexesToRemove = new ArrayList<Integer>();
    Resource[] inputResources = null;
    PathMatchingResourcePatternResolver patternResolver = new PathMatchingResourcePatternResolver();
    try {
        inputResources = patternResolver.getResources(inputPath);
    } catch (IOException e) {
        e.printStackTrace();
    }

    resourceItemReader.setResources(inputResources);
    resourceItemReader.setDelegate(step1Reader());
    resourceItemReader.setComparator(new FileComparator());
    return resourceItemReader;
}

这是step1Reader

@Bean
public FlatFileItemReader<CaricoDTO> step1Reader() {
    FlatFileItemReader<CaricoDTO> reader = new FlatFileItemReader<CaricoDTO>();
    reader.setLinesToSkip(1);
    reader.setLineMapper(new DefaultLineMapper<CaricoDTO>() {
        {
            setLineTokenizer(new DelimitedLineTokenizer("|") {
                {
                    setNames(new String[] { ..... });
                }
            });
            setFieldSetMapper(new BeanWrapperFieldSetMapper<CaricoDTO>() {
                {
                    setTargetType(CaricoDTO.class);
                }
            });
        }
    });
    return reader;
}

这是我的处理器 bean:

@Bean
public CaricoDTOItemProcessorStep1 processorStep1() {
    CaricoDTOItemProcessorStep1 processorStep1 = new CaricoDTOItemProcessorStep1();
    return processorStep1;
}

这是我的处理器定义:

public class CaricoDTOItemProcessorStep1 implements ItemProcessor<CaricoDTO, CaricoDTO> {

    private String fileName;
    private static final Logger log = LoggerFactory.getLogger(CaricoDTOItemProcessorStep1.class);
    
    @Override
    public CaricoDTO process(CaricoDTO carico) throws Exception {

        carico.setDataCaricamento(new Date(System.currentTimeMillis()));
        carico.setFileName(carico.getResource().getFilename());
        return carico;
    }

    public String getFileName() {
        return fileName;
    }

    public void setFileName(String fileName) {
        this.fileName = fileName;
    }

}

这是我的作家:

@Bean
public JdbcBatchItemWriter<CaricoDTO> step1Writer(DataSource dataSource) {
    return new JdbcBatchItemWriterBuilder<CaricoDTO>()
            .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
            .sql(....)
            .dataSource(dataSource).build();
}

【问题讨论】:

  • 到目前为止你做了什么?您的项目/组件设置如何?一个简单直接的方法是使用JdbcTemplate,但可能还有其他/更好的方法。
  • @tmarwen 我有一个读取器从 csv 文件读取,一个处理器和一个写入器写入 db,但由于域表中不存在值,我必须丢弃一些记录。我使用 h2 作为数据持久性
  • 我仍然没有得到全貌。你能从你的代码库中添加最少的可重现代码吗?您如何执行读/写,正在使用 Spring Batch ItemReader / ItemWriter
  • @tmarwen 我已将信息添加到我的问题中。

标签: java spring jdbc spring-batch


【解决方案1】:

鉴于主要 OP 中的一些细节,我假设通用解决方案会被接受。

鉴于经典的 Spring Batch 项目设置,您应该可以访问连接到目标 javax.sql.DataSourceorg.springframework.jdbc.core.JdbcTemplate bean,您可以简单地将其注入您的批处理协作器组件之一:

  • org.springframework.batch.item.ItemReader
  • org.springframework.batch.item.ItemWriter
  • org.springframework.batch.item.ItemProcessor

假设您需要在继续处理项目之前执行一些验证(并且可以选择丢弃它),您可以注入 JdbcTemplate bean,执行查询以验证您的模型不变量,然后进行相应的操作:

public class CaricoDTOItemProcessorStep1 implements ItemProcessor<CaricoDTO, CaricoDTO> {
    
    @Autowired
    private JdbcTemplate jdbcTemplate;

    private String fileName;
    
    private static final Logger log = LoggerFactory.getLogger(CaricoDTOItemProcessorStep1.class);
    
    @Override
    public CaricoDTO process(CaricoDTO item) throws Exception {
        boolean someValue = jdbcTemplate.queryForObject("SELECT some_field FROM some_table WHERE some_other_field = 0", (rs, rowNum) -> rs.getBoolean(0));
        if (someValue) {
            carico.setDataCaricamento(new Date(System.currentTimeMillis()));
            carico.setFileName(carico.getResource().getFilename());
            return carico;
        } else {
            return null; // causes the item to be discarded from processing
        }
    }
}

不用说,您的查询将取决于您的域用例并且需要更新。

【讨论】:

  • 非常感谢。我会测试你的解决方案。
  • 谢谢。这解决了我的问题!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2021-10-27
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多