【问题标题】:Spring batch with Spring Boot terminates before children process with AsyncItemProcessor使用 Spring Boot 的 Spring 批处理在使用 AsyncItemProcessor 的子进程之前终止
【发布时间】:2015-02-28 22:44:04
【问题描述】:

我正在使用带有 AsyncItemProcessor 的 Spring Batch,但事情的表现出乎意料。让我先显示代码:

按照Spring Batch project上所示的一个简单示例:

@EnableBatchProcessing
@SpringBootApplication
@Import({HttpClientConfigurer.class, BatchJobConfigurer.class})
public class PerfilEletricoApp {
    public static void main(String[] args) throws Exception {// NOSONAR
        System.exit(SpringApplication.exit(SpringApplication.run(PerfilEletricoApp.class, args)));
        //SpringApplication.run(PerfilEletricoApp.class, args);
    }
}

-- 编辑

如果我只是让主进程休眠,请给 slf4j 几秒钟时间来写入刷新日志,一切都会按预期工作。

@EnableBatchProcessing
@SpringBootApplication
@Import({HttpClientConfigurer.class, BatchJobConfigurer.class})
public class PerfilEletricoApp {

    public static void main(String[] args) throws Exception {// NOSONAR
        //System.exit(SpringApplication.exit(SpringApplication.run(PerfilEletricoApp.class, args)));
        ConfigurableApplicationContext context = SpringApplication.run(PerfilEletricoApp.class, args);

        Thread.sleep(1000 * 5);
        System.exit(SpringApplication.exit(context));
    }

}

-- ENDOF 编辑

我正在读取带有字段的文本文件,然后使用 AsyncItemProcessor 进行多线程处理,其中包括 URL 上的 Http GET 以获取一些数据,我还使用 NoOpWriter 在写部分。我将 GET 的结果保存在作业的处理器部分(使用 log.trace / log.warn)。

@Configuration
public class HttpClientConfigurer {
    // [... property and configs omitted] 
    @Bean
    public CloseableHttpClient createHttpClient() {
      // ... creates and returns a poolable http client etc
    }
}

关于工作:

@Configuration
public class BatchJobConfigurer {

    @Autowired
    private JobBuilderFactory jobs;

    @Autowired
    private StepBuilderFactory steps;

    @Value("${async.tps:10}")
    private Integer tps;

    @Value("${com.bemobi.perfilelerico.sourcedir:/AppServer/perfil-eletrico/source-dir/}")
    private String sourceDir;

    @Bean
    public ItemReader<String> reader() {
        MultiResourceItemReader<String> reader = new MultiResourceItemReader<>();
        reader.setResources( new Resource[] { new FileSystemResource(sourceDir)});
        reader.setDelegate((ResourceAwareItemReaderItemStream<? extends String>) flatItemReader());
        return reader;
    }

    @Bean
    public ItemReader<String> flatItemReader() {
        FlatFileItemReader<String> itemReader = new FlatFileItemReader<>();
        itemReader.setLineMapper(new DefaultLineMapper<String>() {{
            setLineTokenizer(new DelimitedLineTokenizer() {{
                setNames(new String[] { "sample-field-001"});
            }});
            setFieldSetMapper(new SimpleStringFieldSetMapper<>());
        }});
        return itemReader;
    }


    @Bean
    public ItemProcessor asyncItemProcessor(){
        AsyncItemProcessor<String, OiPaggoResponse> asyncItemProcessor = new AsyncItemProcessor<>();
        asyncItemProcessor.setDelegate(processor());
        asyncItemProcessor.setTaskExecutor(getAsyncExecutor());
        return asyncItemProcessor;
    }

    @Bean
    public ItemProcessor<String,OiPaggoResponse> processor(){
        return new PerfilEletricoItemProcessor();
    }

    /**
     * Using a NoOpItemWriter<T> so we satisfy spring batch flow but don't use writer for anything else.
     * @return a NoOpItemWriter<OiPaggoResponse>
     */
    @Bean
    public ItemWriter<OiPaggoResponse> writer() {
        return new NoOpItemWriter<>();
    }

    @Bean
    protected Step step1() throws Exception {
/*
 Problem starts here, If Use the processor() everything ends nicely, but if I insist on the asyncItemProcessor(), the job ends and the logs from processor are not stored on the disk.
*/
        return this.steps.get("step1").<String, OiPaggoResponse> chunk(10)
                .reader(reader())
                .processor(asyncItemProcessor())   
                .build();
    }

    @Bean
    public Job job() throws Exception {
        return this.jobs.get("consulta-perfil-eletrico").start(step1()).build();
    }

    @Bean(name = "asyncExecutor")
    public TaskExecutor getAsyncExecutor()
    {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(tps);
        executor.setMaxPoolSize(tps);
        executor.setQueueCapacity(tps * 1000);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setThreadNamePrefix("AsyncExecutor-");
        return executor;
    }
}

-- 使用 AsyncItemWriter 更新(工作版本)

   /*Wrapped Writer*/
   @Bean
    public ItemWriter asyncItemWriter(){
        AsyncItemWriter<OiPaggoResponse> asyncItemWriter = new AsyncItemWriter<>();
        asyncItemWriter.setDelegate(writer());
        return asyncItemWriter;
    }

    /*AsyncItemWriter defined on the steps*/
    @Bean
    protected Step step1() throws Exception {
        return this.steps.get("step1").<String, OiPaggoResponse> chunk(10)
                .reader(reader())
                .processor(asyncItemProcessor())
                .writer(asyncItemWriter())
                .build();
    }

-- 关于为什么 AsyncItemProcessor 在向上下文发送 OK-Completed 信号之前不等待所有孩子完成的任何想法?

【问题讨论】:

    标签: spring asynchronous spring-boot spring-batch spring-integration


    【解决方案1】:

    问题在于AsyncItemProcessor 正在创建没有人等待的Futures。将您的NoOpItemWriter 包裹在AsyncItemWriter 中,以便有人在等待Futures。这将使作业按预期完成。

    【讨论】:

    • 迈克尔,这看起来很简单,非常感谢。我已经创建了 NoOpWriter,并没有停下来想一想它是在 AsyncItemProcessor 上创建的。感谢一百万。
    • 感谢您的指点。在配置中使用 AsyncItemProcessor 和 NoOpItemWriter 得到了完全相同的问题。到wrap NoOpItemWriter 并使用它as delegate in AsyncItemWriter 已经解决了这个问题。
    猜你喜欢
    • 2018-12-04
    • 1970-01-01
    • 2014-02-28
    • 2018-08-23
    • 2023-02-08
    • 2018-11-05
    • 2021-08-02
    • 1970-01-01
    • 2018-10-22
    相关资源
    最近更新 更多