【问题标题】:can we process the multiple files sequentially using spring Batch while multiple threads used to process individual files data..?我们可以使用spring Batch顺序处理多个文件,而多个线程用于处理单个文件数据..?
【发布时间】:2020-03-27 09:35:53
【问题描述】:

我想按顺序处理多个文件,每个文件都需要在多个线程的帮助下进行处理,因此使用了 spring 批处理 FlatFileItemReader 和 TaskExecutor,它对我来说似乎工作正常。正如要求中提到的,我们必须处理多个文件,因此与 FlatFileItemReader 一起,我正在使用 MultiResourceItemReader,它将获取许多文件并在我遇到问题时一一处理。有人可以帮我解决异常的原因吗?修复它的方法是什么..?

org.springframework.batch.item.ReaderNotOpenException: Reader must be open before it can be read.
   at org.springframework.batch.item.file.FlatFileItemReader.readLine(FlatFileItemReader.java:195) ~[spring-batch-infrastructure-3.0.5.RELEASE.jar:3.0.5.RELEASE]
   at org.springframework.batch.item.file.FlatFileItemReader.doRead(FlatFileItemReader.java:173) ~[spring-batch-infrastructure-3.0.5.RELEASE.jar:3.0.5.RELEASE]
   at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.read(AbstractItemCountingItemStreamItemReader.java:88) ~[spring-batch-infrastructure-3.0.5.RELEASE.jar:3.0.5.RELEASE]
   at org.springframework.batch.item.file.MultiResourceItemReader.readFromDelegate(MultiResourceItemReader.java:140) ~[spring-batch-infrastructure-3.0.5.RELEASE.jar:3.0.5.RELEASE]
   at org.springframework.batch.item.file.MultiResourceItemReader.readNextItem(MultiResourceItemReader.java:119)

customer2.csv

200,Zoe,Nelson,1973-01-12 17:19:30
201,Vivian,Love,1951-10-31 08:57:08
202,Charde,Lang,1967-02-23 12:24:26

customer3.csv

400,Amelia,Osborn,1972-05-09 09:21:22
401,Gemma,Finch,1989-09-25 23:00:59
402,Orli,Slater,1959-03-30 15:54:32
403,Donovan,Beasley,1986-06-18 14:50:30

customer4.csv

600,Zelenia,Henson,1982-07-03 03:28:39
601,Thomas,Mathews,1954-11-21 20:34:03
602,Kevyn,Whitney,1984-09-21 06:24:25
603,Marny,Leon,1984-06-10 21:32:09
604,Jarrod,Gay,1960-06-22 19:11:04

customer5.csv

800,Imogene,Lee,1966-10-19 17:53:44
801,Mira,Franks,1964-03-08 09:47:43
802,Silas,Dixon,1953-04-11 01:37:51
803,Paloma,Daniels,1962-06-14 17:01:02

我的代码:

@Bean
public MultiResourceItemReader<Customer> multiResourceItemReader() {

    System.out.println("In multiResourceItemReader");
    MultiResourceItemReader<Customer> reader = new MultiResourceItemReader<>();
    reader.setDelegate(customerItemReader());
    reader.setResources(inputFiles);
    return reader;
}

@Bean
public FlatFileItemReader<Customer> customerItemReader() {
    FlatFileItemReader<Customer> reader = new FlatFileItemReader<>();
    DefaultLineMapper<Customer> customerLineMapper = new DefaultLineMapper<>();
    DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
    tokenizer.setNames(new String[] {"id", "firstName", "lastName", "birthdate"});
    customerLineMapper.setLineTokenizer(tokenizer);
    customerLineMapper.setFieldSetMapper(new CustomerFieldSetMapper());
    customerLineMapper.afterPropertiesSet();
    reader.setLineMapper(customerLineMapper);
    return reader;
}

下面的 sn-p 在下面使用时工作正常:

@Bean
public Step step1() {
    return stepBuilderFactory.get("step1")
            .<Customer, Customer>chunk(100).
            reader(customerItemReader())
            .writer(customerItemWriter()).taskExecutor(taskExecutor()).throttleLimit(10)
            .build();
}
}

波纹管 sn-p 无法解决上述异常

@Bean
public Step step1() {
    return stepBuilderFactory.get("step1")
            .<Customer, Customer>chunk(100).
            reader(multiResourceItemReader())
            .writer(customerItemWriter()).taskExecutor(taskExecutor()).throttleLimit(10)
            .build();
}

【问题讨论】:

  • 现在更清楚了。谢谢@Mahmoud Ben Hassine。

标签: java spring spring-batch spring-batch-tasklet spring-batch-stream


【解决方案1】:

由于您在多线程步骤中使用阅读器,一个线程可能已经关闭了当前文件,而另一个线程同时试图从该文件中读取。您需要使用SynchronizedItemStreamReader 同步对您的阅读器的访问:

@Bean
public SynchronizedItemStreamReader<Customer> multiResourceItemReader() {
    System.out.println("In multiResourceItemReader");
    MultiResourceItemReader<Customer> reader = new MultiResourceItemReader<>();
    reader.setDelegate(customerItemReader());
    reader.setResources(inputFiles);

    SynchronizedItemStreamReader<Customer> synchronizedItemStreamReader = new SynchronizedItemStreamReader<>();
    synchronizedItemStreamReader.setDelegate(reader);
    return synchronizedItemStreamReader;
}

【讨论】:

  • 谢谢,@Mahmoud Ben Hassine 似乎一个解决方案非常有效。请分享调试方法。?由于您在多线程步骤中使用阅读器,线程可能已经关闭了当前文件..:- 为什么我只使用 FlatFileItemReader 时不会出现此行为..?
  • 好的,在这种情况下,请接受答案:stackoverflow.com/help/someone-answers。仅 FlatFileItemReader 不会出现此问题,因为在这种情况下,一次只打开一个文件,而使用多资源项读取器时,一个线程可能正在执行方法 close(或 open)资源,而另一个可能正在执行read(因此你的问题)。 SynchronizedItemStreamReader 可以防止这种情况发生。
  • 做到了@Mahmoud Ben Hassine,如果您认为该问题有效,请在问题旁边投赞成票或反对票
猜你喜欢
  • 1970-01-01
  • 2013-09-11
  • 2020-04-27
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-05-23
  • 1970-01-01
相关资源
最近更新 更多