【问题标题】:Parellel Processing Spring Batch StaxEventItemReader并行处理 Spring Batch StaxEventItemReader
【发布时间】:2022-04-26 01:34:38
【问题描述】:

我有一个弹簧批处理作业,定义如下。

<batch:step id="convert">
    <batch:tasklet >
        <batch:chunk reader="contentItemReader" writer="contentItemWriter"
                             processor="processor" commit-interval="10000" >
        </batch:chunk>
     </batch:tasklet>
</batch:step>

contentItemReader如下。

 @Bean
 public StaxEventItemReader contentItemReader() {
        StaxEventItemReader reader = new StaxEventItemReader();
        reader.setFragmentRootElementName("ContentItem");
        reader.setResource(new FileSystemResource(baseDirectory.concat(inputFile)));
        reader.setUnmarshaller(contentItemUnmarshaller());
        return reader;
 }

一切都很好,只是它比我想要的慢了一点。我知道这个阅读器不是线程安全的。所以我认为我不能将 taskExecutor 添加到 tasklet 中。 ContentItems 不相互依赖,因此我想将数据并行输入处理器。 ItemProcessing 可能相当耗时。所以虽然我知道我不能拥有多线程阅读器,但我应该能够拥有多线程项目处理。

ItemWriters 也需要是单线程的,因为我使用的是 flatFile ItemWriter。

最好的方法是什么?

【问题讨论】:

    标签: spring spring-batch


    【解决方案1】:

    只需将您的阅读器包装成这样:

    public class SynchronizedWrapperReader<T> implements ItemStreamReader<T> {
    
      private ItemReader<T> itemReader;
      private boolean isStream = false;
    
      public void setItemReader(ItemReader<T> itemReader) {
        this.itemReader = itemReader;
        if (itemReader instanceof ItemStream) {
          isStream = true;
        }
      }
    
      @Override
      public void close() {
        if (isStream) {
          ((ItemStream) itemReader).close();
        }
      }
    
      @Override
      public void open(ExecutionContext executionContext) {
        if (isStream) {
          ((ItemStream) itemReader).open(new ExecutionContext());
        }
      }
    
      @Override
      public void update(ExecutionContext executionContext) {
      }
    
      @Override
      public synchronized T read() throws Exception {
        return itemReader.read();
      }
    }
    

    你的作家也是如此。

    请注意,订单不再得到保证。

    编辑:

    在 config.xml 中有一条关于如何使用它的评论。所以,这里是一个简单的例子,如何将 Wrapper 与 FlatFileItemReader 一起使用:

    <batch:step id="convert">
        <batch:tasklet >
            <batch:chunk reader="wrappedReader" writer="..."
                                 processor="..." commit-interval="10000" >
            </batch:chunk>
         </batch:tasklet>
    </batch:step>
    
    <bean id="wrappedReader" class=[package].SynchronizedWrapperReader">
       <property name="itemReader">
          <bean class="org.springframework.batch.item.file.FlatFileItemReader">
              <property .../>
              <property .../>
          </bean>
       </property>
    </bean>
    

    【讨论】:

    • 我假设您打算在步骤配置中将此与 taskExecutor 结合使用。我可以试试这个。我还在自定义 itemWriter 中使用了一个使用 taskExecutor 的解决方案,同时将我的所有项目处理逻辑移到那里。这似乎可行,尽管我没有达到预期的速度,这可能与磁盘 I/O 成为限制因素有关。
    • 是的,你是对的。您必须使用 AsyncTaskExecutor 并且必须设置油门限制。
    • 我终于有机会尝试一下了。虽然在功能上它似乎是一个正确的答案,就像我在 Async SI 工作中看到的那样,加速并不是那么好。可能是由于磁盘 I/O 是有限的因素。
    • @user1629109 我在上面的答案中添加了一个示例
    • 这不是映射器,包装器只是确保多个线程在调用“真正的”阅读器时是同步的。
    【解决方案2】:

    由于版本 3.0.4 Spring Batch 提供了一个开箱即用的包装类(如 Hansjoerg Wingeier):SynchronizedItemStreamReader&lt;T&gt;

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-03-27
      • 1970-01-01
      • 2023-02-08
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多