【发布时间】:2019-02-26 00:16:16
【问题描述】:
我的 Spring 应用程序使用 ActiveMQ 队列。有两种可能的方法。两种方法的 ActiveMQ 集成的初始部分是相同的:
@Bean
public ConnectionFactory connectionFactory() {
return new ActiveMQConnectionFactory();
}
@Bean
public Queue notificationQueue() {
return resolveAvcQueueByJNDIName("java:comp/env/jms/name.not.important.queue");
}
单线程方法:
@Bean
public IntegrationFlow orderNotify() {
return IntegrationFlows.from(Jms.inboundAdapter(connectionFactory()).destination(notificationQueue()),
c -> c.poller(Pollers.fixedDelay(QUEUE_POLLING_INTERVAL_MS)
.errorHandler(e -> logger.error("Can't handle incoming message", e))))
.handle(...).get();
}
但我想使用多个工作线程来消费消息,所以我将代码从入站适配器重构为消息驱动的通道适配器:
@Bean
public IntegrationFlow orderNotify() {
return IntegrationFlows.from(Jms.messageDriverChannelAdapter(connectionFactory()).configureListenerContainer(c -> {
final DefaultMessageListenerContainer container = c.get();
container.setMaxConcurrentConsumers(notifyThreadPoolSize);
}).destination(notificationQueue()))
.handle(...).get();
}
问题是应用程序在重新部署到 Tomcat 或为第二种方法重新启动时不会停止 ActiveMQ 的使用者。它在启动期间创建了新的消费者。但是所有新消息都被路由到旧的“死”消费者,因此它们位于“待处理消息”部分并且永远不会出队。
这可能是什么问题?
【问题讨论】:
标签: java concurrency spring-integration activemq