【问题标题】:Spring Batch asynchronous processor configuration for best performanceSpring Batch 异步处理器配置以获得最佳性能
【发布时间】:2018-01-27 06:46:44
【问题描述】:

我在 Spring Batch 中创建异步处理器时遇到问题。 我的处理器从reader 获取ID 并根据来自SOAP 调用的响应创建对象。有时对于 1 个输入 (ID),必须有例如60-100 SOAP 调用,有时只有 1 个。我尝试进行多线程步骤,它一次处理例如 50 个输入,但它没有用,因为 49 个线程在 1 秒内完成了它们的工作并被阻塞,等待这个正在做的线程60-100 SOAP 电话。现在我使用AsyncItemProcessor+AsyncItemWriter,但这个解决方案对我来说效果很慢。由于我的输入 (IDs) 很大,从 DB 读取的大约 25k 个项目我想一次开始约 50-100 个输入。

这是我的配置:

@Configuration
public class BatchConfig {

    @Autowired
    public JobBuilderFactory jobBuilderFactory;
    @Autowired
    public StepBuilderFactory stepBuilderFactory;
    @Autowired
    private DatabaseConfig databaseConfig;
    @Value(value = "classpath:Categories.txt")
    private Resource categories;

    @Bean
    public Job processJob() throws Exception {
        return jobBuilderFactory.get("processJob").incrementer(new RunIdIncrementer()).listener(listener()).flow(orderStep1()).end().build();
    }

    @Bean
    public Step orderStep1() throws Exception {
        return stepBuilderFactory.get("orderStep1").<Category, CategoryDailyResult>chunk(1).reader(reader()).processor(asyncItemProcessor()).writer(asyncItemWriter()).taskExecutor(taskExecutor()).build();
    }

    @Bean
    public JobExecutionListener listener() {
        return new JobCompletionListener();
    }


    @Bean
    public ItemWriter asyncItemWriter() {
        AsyncItemWriter<CategoryDailyResult> asyncItemWriter = new AsyncItemWriter<>();
        asyncItemWriter.setDelegate(itemWriter());
        return asyncItemWriter;
    }

    @Bean
    public ItemWriter<CategoryDailyResult> itemWriter(){
        return new Writer();
    }

    @Bean
    public ItemProcessor asyncItemProcessor() {
        AsyncItemProcessor<Category, CategoryDailyResult> asyncItemProcessor = new AsyncItemProcessor<>();
        asyncItemProcessor.setDelegate(itemProcessor());
        asyncItemProcessor.setTaskExecutor(taskExecutor());
        return asyncItemProcessor;
    }

    @Bean
    public ItemProcessor<Category, CategoryDailyResult> itemProcessor(){
        return new Processor();
    }

    @Bean
    public TaskExecutor taskExecutor(){
        SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
        taskExecutor.setConcurrencyLimit(50);
        return taskExecutor;
    }

    @Bean(destroyMethod = "")
    public ItemReader<Category> reader() throws Exception {
        String query = "select c from Category c where not exists elements(c.children)";

        JpaPagingItemReader<Category> reader = new JpaPagingItemReader<>();
        reader.setSaveState(false);
        reader.setQueryString(query);
        reader.setEntityManagerFactory(databaseConfig.entityManagerFactory().getObject());
        reader.setPageSize(1);

        return reader;
    }
}

如何提升我的应用程序?也许我做错了什么?欢迎任何反馈;)

@编辑: 对于 ID 的输入:1 到 100 我想要例如 50 个正在执行处理器的线程。我希望他们不要互相阻止: Thread1 处理输入“1” 2 分钟,此时我希望 Thread2 处理输入“2”、“8”、“64”,这些输入很小,几秒钟内执行。

@Edit2: 我的目标: 我在数据库中有 25k 个 ID,我用 JpaPagingItemReader 读取它们,每个 ID 都由处理器处理。每个项目都是相互独立的。对于每个 ID,我让 SOAP 在循环中调用 0-100 次,然后创建对象,将其传递给 Writer 并保存在数据库中。我如何才能获得此类任务的最佳性能?

【问题讨论】:

  • “这个解决方案对我来说工作缓慢”。那是什么意思?瓶颈是什么?你做过分析吗?
  • 处理器中的这些 SOAP 调用是瓶颈。对于约 60 个调用的 1 个输入,大约需要 3 分钟。它工作缓慢,因为其他线程正在等待这个长的。
  • +1 到@MichaelMinella,请给我们更多关于你所做的和你期望的内容。除此之外,您应该使用 ThreadPoolTask​​Executor,因为简单的异步会为每个任务创建一个新线程
  • 编辑了我的问题 + ThreadPoolTask​​Executor 没有加速处理。
  • 你试过使用spring批处理分区器类吗?您需要将 25k ID 划分为不同的批次,并为每个批次进行处理。

标签: java spring multithreading spring-batch


【解决方案1】:

你应该划分你的工作。像这样添加一个分区步骤:

@Bean
public Step partitionedOrderStep1(Step orderStep1) {
    return stepBuilder.get("partitionedOrderStep1")
            .partitioner(orderStep1)
            .partitioner("orderStep1", new SimplePartitioner())
            .taskExecutor(taskExecutor())
            .gridSize(10)  //Number of concurrent partitions
            .build();
}

然后在您的作业定义中使用该步骤。 .gridSize() 调用配置要同时执行的分区数。如果您的任何 Reader、Processor 或 Writer 对象是有状态的,您需要使用 @StepScope 对其进行注释。

【讨论】:

    【解决方案2】:

    @KCrookedHand:我已经处理了similar kind of scenario,我必须阅读数千并且需要调用 SOAP 服务(我已将其注入 itemReader)以匹配标准。

    我的配置如下所示,基本上你有几个选项来实现并行处理,其中两个是“分区”和“客户端服务器”方法。我选择分区是因为我可以根据我的数据更好地控制我需要多少个分区。

    请使用@MichaelMinella 提到的 ThreadPoolTask​​Executor,以便在适用的情况下使用 tasklet 执行以下步骤。

    <batch:step id="notificationMapper">
                <batch:partition partitioner="partitioner"
                    step="readXXXStep" />
            </batch:step>
        </batch:job>
    
    
        <batch:step id="readXXXStep">
            <batch:job ref="jobRef" job-launcher="jobLauncher"
                job-parameters-extractor="jobParameterExtractor" />
        </batch:step>
    
        <batch:job id="jobRef">
    
            <batch:step id="dummyStep" next="skippedItemsDecision">
                <batch:tasklet ref="dummyTasklet"/>
                <batch:listeners>
                    <batch:listener ref="stepListener" />
                </batch:listeners>
            </batch:step>
    
            <batch:step id="xxx.readItems" next="xxx.then.finish">
                <batch:tasklet>
                    <batch:chunk reader="xxxChunkReader" processor="chunkProcessor"
                        writer="itemWriter" commit-interval="100">
                    </batch:chunk>
                </batch:tasklet>
                <batch:listeners>
                    <batch:listener ref="taskletListener" />
                </batch:listeners>
            </batch:step>
    
            ...
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-10-14
      • 1970-01-01
      • 2013-05-07
      • 2022-01-20
      • 1970-01-01
      • 2023-03-29
      相关资源
      最近更新 更多