【问题标题】:How to achieve parallel processing to improve processing time如何实现并行处理以提高处理时间
【发布时间】:2019-05-01 17:44:58
【问题描述】:

目前我正在使用 springbatch 以下列方式处理 csv 和 excel 文件。

  1. Reader(将解析 csv/excel 文件和男性 pojo)
  2. 处理器(无论该记录是否存在于 DB 中都会命中 Db)
  3. Writer(将pojo推送到消息队列)

实时我有 50k + 记录要处理,我的代码几乎需要 25 分钟。我想通过实现并行处理来缩短处理时间(这样我们就可以在更短的时间内并行处理)。

但我不知道如何使用 Spring Batch 实现并行处理。任何人都可以指导我如何做到这一点或任何建议以改善处理时间。

@Bean
    public TaskExecutor taskExecutor(){
        return new SimpleAsyncTaskExecutor("CSV-Async-batch");
    }


    @Bean(name="csvjob")
    public Job job(JobBuilderFactory jobBuilderFactory,StepBuilderFactory stepBuilderFactory,ItemReader<List<CSVPojo>> itemReader,ItemProcessor<List<CSVPojo>,CsvWrapperPojo> itemProcessor,AmqpItemWriter<CsvWrapperPojo> itemWriter){
        Step step=stepBuilderFactory.get("ETL-CSV").<List<CSVPojo>,CsvWrapperPojo>chunk(100)
                .reader(itemReader)
                .processor(itemProcessor)
                .writer(itemWriter)
                .taskExecutor(taskExecutor())
                .throttleLimit(40)
                .build();



        Job csvJob= jobBuilderFactory.get("ETL").incrementer(new RunIdIncrementer())
        .start(step).build();

====SynchronizedItemStreamReader 的阅读器==================

@Component
public class Reader extends SynchronizedItemStreamReader<List<CSVPojo>> {

    public static MultipartFile reqFile=null;
    List<CSVPojo> result = new ArrayList<CSVPojo>();

    @Autowired
    private CSVProcessService csvProcessService;

    public static boolean batchJobState ;

    /*public Reader(MultipartFile file){

        this.reqFile=file;
    }*/

    public void setDelegate(ItemStreamReader<List<CSVPojo>> delegate){

        /*try {
            this.read();
        } catch (UnexpectedInputException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (ParseException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (NonTransientResourceException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }*/
    }


    @Override
    public List<CSVPojo> read() throws Exception, UnexpectedInputException,
            ParseException, NonTransientResourceException {
        // TODO Auto-generated method stub
        if(!batchJobState){
        result=csvProcessService.processCSVFile(reqFile);
        System.out.println("in batch job reader");
        batchJobState=true;
        return result;
        }
        return null;
    }

}

提前致谢!!!

【问题讨论】:

    标签: spring-mvc spring-boot spring-batch


    【解决方案1】:

    您可以使用分区技术对输入文件进行分区并并行处理它们。这在参考文档的Partitioning 部分中有详细说明。

    您还可以查看spring-batch-samples 模块中的Local partitioning sampleRemote partitioning sample

    这个问题也有类似的问题,我这里补充一下供参考:

    希望这会有所帮助。

    【讨论】:

    • 谢谢,以上链接很有帮助。我使用了多线程步骤方法。在上面的主线程中添加了我的配置代码。这里我在 csv 文件中有 2 条记录,似乎多个线程异步作用于同一个文件,并且所有线程都读取并发布到队列中,所以我看到发布了 24 条记录(而不是 2 条)。有什么办法可以让多线程同步读取文件?
    • 我的意思是我们如何才能使批处理作业阅读器线程安全?因此文件的读取必须以同步方式发生。
    • SynchronizedItemStreamReader 是您要找的。​​span>
    • 得到错误:在 URL 中创建名称为“reader”的 bean 时出错,init 方法的调用失败;嵌套异常是 java.lang.IllegalArgumentException:需要委托项目阅读器。我必须在我的阅读器中覆盖 public void setDelegate() 吗?请参阅上述线程中的阅读器相关代码。
    • 不,你不需要覆盖setDelegate,你需要在同步的一个中设置代理阅读器。这是一个示例:github.com/spring-projects/spring-batch/blob/master/…
    猜你喜欢
    • 1970-01-01
    • 2018-04-02
    • 2017-08-10
    • 2013-10-23
    • 2022-01-01
    • 2020-03-31
    • 1970-01-01
    • 2019-09-14
    • 2017-07-16
    相关资源
    最近更新 更多