【问题标题】:RabbitTemplate not ending the spring boot + spring batch job appRabbitTemplate 没有结束 Spring Boot + Spring Batch Job 应用程序
【发布时间】:2017-11-13 20:35:32
【问题描述】:

我有一个从RabbitMQ 队列读取的批处理作业。我使用AmqpItemReader 作为读者。我面临的问题是,当批处理作业完成时,应用程序不会自行终止。我不确定我是否正确配置了RabbitTemplate。如果有人能帮我找出我的应用程序没有正确终止的原因,那就太好了。

RabbitConfig类:

@Configuration
@ConfigurationProperties("service.product.config.rabbitmq")
public class RabbitConfig {

  @Setter
  private String host;
  @Setter
  private Integer port;
  @Setter
  private String username;
  @Setter
  private String password;
  @Setter
  private String exchangeName;
  @Setter
  private String queueName;

  @Bean
  ConnectionFactory rabbitConnectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host);
    connectionFactory.setPort(port);
    connectionFactory.setUsername(username);
    connectionFactory.setPassword(password);
    return connectionFactory;
  }

  @Bean
  RabbitTemplate rabbitTemplate(ConnectionFactory rabbitConnectionFactory) {

    RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory);
    rabbitTemplate.setQueue(queueName);
    rabbitTemplate.setExchange(exchangeName);

    return rabbitTemplate;
  }
}

Job类:

@Configuration
@EnableBatchProcessing
public class ImportJobConfig {

  private final Logger logger = LoggerFactory.getLogger(ImportJobConfig.class);

  @Autowired
  JobBuilderFactory jobBuilderFactory;
  @Autowired
  StepBuilderFactory stepBuilderFactory;
  @Autowired
  RabbitTemplate rabbitTemplate;

  @Bean
  public Job importJob() {
    return jobBuilderFactory.get("importJob")
        .listener(new JobExecutionListener() {
          @Override
          public void beforeJob(JobExecution jobExecution) {
            logger.info("Ready to start the job");
          }

          @Override
          public void afterJob(JobExecution jobExecution) {
            logger.info("Job successfully executed.");
          }
        })
        .incrementer(new RunIdIncrementer())
        .flow(stepBuilderFactory.get("importStep")
            .<String, String>chunk(2)
            .reader(new AmqpItemReader<>(rabbitTemplate))
            .listener(new QueueListener<>())
            .processor(new CustomItemProcessor())
            .writer(new CustomItemWriter())
            .build())
        .end()
        .build();
  }
}

编辑: 我的主要方法类:

@SpringBootApplication
public class BulkImportProductApplication {

  public static void main(String[] args) {
    SpringApplication app = new SpringApplication(BulkImportProductApplication.class);
    app.setWebEnvironment(false);
    app.run(args);
  }

}

编辑结束

我使用的依赖项:

spring-boot-starter-amqp
spring-boot-starter-batch
com.h2database:h2
spring-batch-test
spring-boot-starter-test

如果您需要其他任何东西来帮助您找出我的工作在工作完成后没有终止的原因,请告诉我。

【问题讨论】:

    标签: java spring-boot rabbitmq spring-batch spring-amqp


    【解决方案1】:

    您的main() 方法在哪里?您如何创建应用程序上下文?

    作业完成时,您需要close() 应用程序上下文;您可以使用JobExecutionListener 来检测作业何时完成。

    关闭上下文将调用连接工厂上的destroy()(这可能是保持 JVM 运行的原因)。

    【讨论】:

    • 我用我的主要方法类编辑了这个问题。我不必在以前的批处理作业中做这些事情。事实上,如果我用普通的 ListItemReader 替换我的 AmqpItemReader,我的作业会在作业完成后正确终止。
    • 对,但是使用 AMQP 在作业完成后有一些资源(连接)需要清理。该框架无法为您执行此操作,但它提供了挂钩(例如 JobExecutionListener)。
    • 酷,它现在可以工作了。非常感谢您在我的主要方法中调用 app.run(args).close() 解决了问题。
    • 是的。如果一切都在main 线程上同步完成,那将起作用。
    最近更新 更多