【问题标题】:Spring batch Step after JMS Reader/Processor/Writer step is not getting triggered未触发 JMS 读取器/处理器/写入器步骤后的 Spring 批处理步骤
【发布时间】:2020-09-10 01:45:20
【问题描述】:

我们的春季批次需要有2个步骤

Step1 : Deque 来自 JMS 的消息并处理消息并将输出数据写入文件。 第 2 步:包含 2 个从属步骤的分区步骤。每个从属步骤对第 1 步输出数据应用不同的算法。

问题:从未调用过第 2 步。尝试将 Step 侦听器附加到第 1 步(以及编写器),但从未执行。看起来 step1 始终处于持续状态,因此 step2(分区步骤)永远不会被执行。

观察:当将步骤 1 中的 JMSReader 替换为 Normal (File/DB) Reader 时,控制转到步骤 2

注意:需要 firstQueueTemplate.setReceiveTimeout(Long.MAX_VALUE);因为我们需要连续不断地将消息逐个出列。

    public TransactionAwareConnectionFactoryProxy activeMQConnectionFactory() {
        ActiveMQConnectionFactory amqConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
        TransactionAwareConnectionFactoryProxy activeMQConnectionFactory = new TransactionAwareConnectionFactoryProxy(amqConnectionFactory);
        return activeMQConnectionFactory;
    }

    @Bean
    public ActiveMQQueue defaultQueue() {
        return new ActiveMQQueue("batch-test");
    }

    @Bean
    @DependsOn(value = { "activeMQConnectionFactory", "defaultQueue" })
    public JmsTemplate firstQueueTemplate(ActiveMQQueue defaultQueue, TransactionAwareConnectionFactoryProxy activeMQConnectionFactory) {
        JmsTemplate firstQueueTemplate = new JmsTemplate(activeMQConnectionFactory);
        firstQueueTemplate.setDefaultDestination(defaultQueue);
        firstQueueTemplate.setSessionTransacted(true);
        firstQueueTemplate.setReceiveTimeout(Long.MAX_VALUE);
        return firstQueueTemplate;
    }

    @Bean(name = "partitionerJob")
    public Job partitionerJob() throws UnexpectedInputException, MalformedURLException, ParseException {
        return jobs.get("partitionerJob")
          .start(ingestionstep())
          .next(partitionStep())
          .build();
    }




    @Bean
    public Step ingestionstep() throws UnexpectedInputException, MalformedURLException, ParseException {
        System.out.println("ingestionstep forming");
        return steps.get("ingsetionstep")
                .<SPDRIngestScanBO, SPDRIngestScanBO>chunk(1)
                  .reader(jmsItemReader())
                  .processor(ingestionProcessor())
                  .writer(ingestionwriter() )
                  .listener(new StepExecutionListener() {

                                @Override
                                public void beforeStep(StepExecution stepExecution) {
                                    // TODO Auto-generated method stub

                                }

                                @Override
                                public ExitStatus afterStep(StepExecution stepExecution) {
                                    System.out.println("step exit status :"+stepExecution.getExitStatus());
                                    return null;
                                }
                })
                 // .listener(promotionListener())

                  .build();
    }




   @Bean
    @StepScope
    public Step partitionStep() throws UnexpectedInputException, MalformedURLException, ParseException {
        System.out.println(" Inside partitionStep method ");
        return steps.get("partitionStep")
                 .partitioner("partitionscans", partitioner(null))
                 .gridSize(2)
                 .step(scanStep())
                 .taskExecutor(taskExecutor())
                 .build();
    }

    @Bean
    public JmsItemReader<SPDRIngestScanBO> jmsItemReader() {
        JmsItemReader<SPDRIngestScanBO> jmsItemReader = new JmsItemReader<>();
        jmsItemReader.setJmsTemplate(jmsTemplate);
        jmsItemReader.setItemType(SPDRIngestScanBO.class);

        return jmsItemReader;
    }

    @Bean
    public  SPDRIngestionStepProcessor  ingestionProcessor() {
        return new SPDRIngestionStepProcessor();
    }

    @Bean
    public  SPDRIngestionStepWriter  ingestionwriter() {
        return new SPDRIngestionStepWriter();
    }


    @Bean
    @StepScope
    public ModelsPartitioner partitioner(@Value("#{jobExecutionContext[models]}")  List<SPDRScanModelBO> models) {

        ModelsPartitioner partitioner = new ModelsPartitioner();
        partitioner.setModels(models);
       System.out.println("----partitioner----");
        return partitioner;
    }


    @Bean
    @StepScope
    public Step scanStep() throws UnexpectedInputException, MalformedURLException, ParseException {
        return steps.get("scanstep")
          .<SPDRScanModelBO, SPDRScanResultBO>chunk(1)
          .reader(scanStepReader(null))
          .processor(scanStepProcessor())
          .writer(scanStepWriter())
          .build();
    }


  @Bean
  @StepScope
  public SPDRScanStepReader scanStepReader(@Value("#{stepExecutionContext[model]}") SPDRScanModelBO scanModelBO){
      System.out.println("----scanStepReader----"); 
      SPDRScanStepReader scanStepReader = new SPDRScanStepReader();
      scanStepReader.setScanModelBO(scanModelBO);
      return scanStepReader;

  }

  @Bean
  @StepScope
  public SPDRScanStepProcessor scanStepProcessor(){

      SPDRScanStepProcessor scanStepProcessor = new SPDRScanStepProcessor();
      return scanStepProcessor;

  }

  @Bean
  @StepScope
  public SPDRScanStepWriter scanStepWriter(){

      SPDRScanStepWriter scanStepWriter = new SPDRScanStepWriter();
      return scanStepWriter;

  }

    @Bean
    @StepScope
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setMaxPoolSize(2);
        taskExecutor.setCorePoolSize(2);
        taskExecutor.setQueueCapacity(2);
        taskExecutor.afterPropertiesSet();
        return taskExecutor;
    }
    ```

【问题讨论】:

标签: java spring jms spring-batch exit-code


【解决方案1】:

在 Spring Batch 中,当你顺序运行两个步骤时,说 step1 然后 step2,然后 step2 只有在 step1 完成时才会执行。

在您的情况下,step1 从带有receiveTimeout = Long.MAX_VALUE 的 jms 队列中读取,因此除非发生此超时,否则您的 step1 将不会完成,因此 step2 将不会启动。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-06
    • 1970-01-01
    相关资源
    最近更新 更多