【问题标题】:how to partition steps in spring-batch?如何在spring-batch中划分步骤?
【发布时间】:2019-12-11 06:03:10
【问题描述】:

我正在学习 Spring Batch,并编写了简单的应用程序来玩它。 根据我的要求,我从单个 csv 文件中读取,进行一些转换并插入数据库。

我有以下配置:

    @Bean
    public Step step1(JdbcBatchItemWriter<Person> writer) {
        return stepBuilderFactory.get("step1")
                .<Person, Person>chunk(10)
                .reader(reader())
                .processor(processor())
                .writer(writer)
                .build();
    }

   @Bean
    public Job importUserJob(JobCompletionNotificationListener listener, Step step1, Step step2) {
        return jobBuilderFactory.get("importUserJob")
                .incrementer(new RunIdIncrementer())
                .listener(listener)
                .listener(new JobExecutionListener() {
                    @Override
                    public void beforeJob(JobExecution jobExecution) {
                        System.out.println("!!!!!!!!!!!!!SECOND_LISTENER_BEFORE!!!!!!!!!!!!!!!!");
                    }

                    @Override
                    public void afterJob(JobExecution jobExecution) {
                        System.out.println("!!!!!!!!!!!!!SECOND_LISTENER_AFTER!!!!!!!!!!!!!!!!");

                    }
                })
                .flow(step1)
                .next(step2)
                .end()
                .build();
    }

public FlatFileItemReader reader() {
    return new FlatFileItemReaderBuilder()
        .name("csvPersonReader")
        .resource(csvResource)
        .delimited()
        .names(new String[]{"firstName", "lastName"})
        .fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
            setTargetType(Person.class);
        }})
        .build();

}

现在我想使用 10 个线程执行该步骤。据我了解,我需要为此使用分区功能。我找到了几个关于它的例子,但它包含 XML 配置。我更喜欢使用java配置。

我怎样才能实现它?

附言

我尝试了以下方法:

@Bean
public Step step1(JdbcBatchItemWriter<Person> writer) {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(1);
    TaskletStep step1 = stepBuilderFactory.get("step1")
            .<Person, Person>chunk(10)
            .reader(reader())
            .processor(processor())
            .writer(writer)
            .taskExecutor(taskExecutor)
            .build();

    return step1;
}

但我的应用程序挂起。此外,它不是分区,只能在单台 PC 上运行

【问题讨论】:

    标签: java spring spring-boot spring-batch


    【解决方案1】:

    您的配置错误。按照下面的配置。您需要确定要分区的逻辑。看看partitioner的partition方法是如何创建map并添加到Execution Context的。

    按照下面的代码进行

    @Bean
    public Step step1(JdbcBatchItemWriter<Person> writer) {
        TaskletStep step1 = stepBuilderFactory.get("partionerStep")
                .partitioner("slaveStep", partitioner())
                .step(slaveStep())
                .taskExecutor(taskExecutor())
                .build();
    }
    
    @Bean
    public CustomPartitioner partitioner() {
        CustomPartitioner partitioner = new CustomPartitioner();
        return partitioner;
    }
    

    public class CustomPartitioner implements Partitioner {
    
        @Override
        public Map<String, ExecutionContext> partition(int gridSize) {
            Map<String, ExecutionContext> map = new HashMap<>(gridSize);
            int i = 0, k = 1;
            for (Resource resource : resources) {
                ExecutionContext context = new ExecutionContext();
                context.putString("keyName", ""); //Depends on what logic you want to use to split 
                map.put("PARTITION_KEY" + i, context);
                i++;
            }
            return map;
        }
    }
    
    @Bean
        public TaskExecutor taskExecutor() {
            ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
            taskExecutor.setMaxPoolSize(10);
            taskExecutor.setCorePoolSize(10);
            taskExecutor.setQueueCapacity(10);
            taskExecutor.afterPropertiesSet();
            return taskExecutor;
        }
    
    @Bean
    public Step slaveStep() 
      throws UnexpectedInputException, MalformedURLException, ParseException {
        return steps.get("slaveStep")
                    .<Person, Person>chunk(10)
                    .reader(reader())
                    .processor(processor())
                    .writer(writer)
                    .build();
    }
    
    @Bean
        public Job importUserJob(JobCompletionNotificationListener listener, Step step1, Step step2) {
            return jobBuilderFactory.get("importUserJob")
                    .incrementer(new RunIdIncrementer())
                    .listener(listener)
                    .listener(new JobExecutionListener() {
                        @Override
                        public void beforeJob(JobExecution jobExecution) {
                            System.out.println("!!!!!!!!!!!!!SECOND_LISTENER_BEFORE!!!!!!!!!!!!!!!!");
                        }
    
                        @Override
                        public void afterJob(JobExecution jobExecution) {
                            System.out.println("!!!!!!!!!!!!!SECOND_LISTENER_AFTER!!!!!!!!!!!!!!!!");
    
                        }
                    })
                    .flow(step1)
                    .next(step2)
                    .end()
                    .build();
        }
    
        return step1;
    }
    

    【讨论】:

    • 能否详细说明分区器的实现。不清楚它是什么意思: context.putString("keyName", ""); //取决于你想用什么逻辑来分割
    【解决方案2】:

    您可以使用下面的代码来实现批量分区。

    @Configuration
    public class DemoJobBatchConfiguration {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(DemoJobBatchConfiguration.class);
    
        @Autowired
        private JobBuilderFactory jobBuilderFactory;
    
        @Autowired
        private StepBuilderFactory stepBuilderFactory;
    
        @Autowired
        @Qualifier("applicaionDS")
        public DataSource dataSource;
    
        @Autowired
        UserWritter userWriter;
    
        @Bean("demoJob")
        public Job partitionJob(JobNotificationListener listener, JobBuilderFactory jobBuilderFactory,
                @Qualifier("demoPartitionStep") Step demoPartitionStep) {
            return jobBuilderFactory.get("demoJob").incrementer(new RunIdIncrementer()).listener(listener)
                    .start(demoPartitionStep).build();
        }
    
        @Bean(name = "demoPartitionStep")
        public Step demoPartitionStep(Step demoSlaveStep, StepBuilderFactory stepBuilderFactory) {
            return stepBuilderFactory.get("demoPartitionStep").partitioner("demoPartitionStep", demoPartitioner())
                    .gridSize(21).step(demoSlaveStep).taskExecutor(jobTaskExecutor()).build();
        }
    
        @Bean(name = "demoPartitioner", destroyMethod = "")
        public Partitioner demoPartitioner() {
            DemoPartitioner partitioner = new DemoPartitioner();
            // partitioner.partition(20);
            return partitioner;
        }
    
        @Bean
        public Step demoSlaveStep(ItemReader<User> demoReader, ItemProcessor<User, User> demoJobProcessor) {
            return stepBuilderFactory.get("demoSlaveStep").<User, User>chunk(3).reader(demoReader)
                    .processor(demoJobProcessor).writer(userWriter).build();
        }
    
        @Bean(name = "demoReader")
        @StepScope
        public JdbcCursorItemReader<User> demoReader(@Value("#{stepExecutionContext[SQL]}") String SQL,
                @Value("#{jobParameters[JOB_PARM]}") String jobParm,
                @Value("#{jobExecutionContext[jobExecutionParameter]}") String jobExecutionParameter) {
            LOGGER.info("---------------------- demoReader ------------------------------- " + SQL);
            LOGGER.info(" jobParm : " + jobParm);
            LOGGER.info(" jobExecutionParameter : " + jobExecutionParameter);
    
            JdbcCursorItemReader<User> reader = new JdbcCursorItemReader<>();
            reader.setDataSource(this.dataSource);
            reader.setFetchSize(200);
            reader.setRowMapper(new BeanPropertyRowMapper<>(User.class));
            reader.setSql(SQL);
            return reader;
        }
    
        @Bean(name = "demoJobProcessor")
        @StepScope
        public ItemProcessor<User, User> demoJobProcessor() throws Exception {
            LOGGER.info(" DemoJobBatchConfiguration: demoJobProcessor  ");
            return new UserProcessor();
        }
    
        /*
         * @Bean public ItemWriter<User> demoWriter() { return users -> { for (User user
         * : users) { if (LOGGER.isInfoEnabled()) { LOGGER.info("user read is :: " +
         * user.toString()); } } if (LOGGER.isInfoEnabled()) {
         * LOGGER.info("%%%%%%%%%%%%%%%%%%%%% demoWriter %%%%%%%%%%%%%%%%%%%%% "); } };
         * }
         */
    
        @Bean
        public TaskExecutor jobTaskExecutor() {
            ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
            // there are 21 sites currently hence we have 21 threads
            taskExecutor.setMaxPoolSize(30);
            taskExecutor.setCorePoolSize(25);
            taskExecutor.afterPropertiesSet();
            return taskExecutor;
        }
    
    }
    

    public class DemoPartitioner implements Partitioner {
    
        @Override
        public Map<String, ExecutionContext> partition(int gridSize) {
    
            Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();
    
            int range = 3;
            int fromId = 1;
            int toId = range;
    
            for (int i = fromId; i <= gridSize;) {
                ExecutionContext executionContext = new ExecutionContext();
                String SQL = "SELECT * FROM CUSTOMER WHERE ID BETWEEN " + fromId + " AND " + toId;
                System.out.println("SQL : " + SQL);
                executionContext.putInt("fromId", fromId);
                executionContext.putInt("toId", toId);
                executionContext.putString("SQL", SQL);
                executionContext.putString("name", "Thread" + i);
                result.put("partition" + i, executionContext);
                fromId = toId + 1;
                i = fromId;
                toId += range;
            }
            return result;
        }
    
    }
    

    【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2023-02-10
    • 1970-01-01
    • 1970-01-01
    • 2022-01-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-09-27
    相关资源
    最近更新 更多