【问题标题】:Too many active consumers for ActiveMQ queueActiveMQ 队列的活动消费者过多
【发布时间】:2019-02-26 00:16:16
【问题描述】:

我的 Spring 应用程序使用 ActiveMQ 队列。有两种可能的方法。两种方法的 ActiveMQ 集成的初始部分是相同的:

@Bean
public ConnectionFactory connectionFactory() {
    return new ActiveMQConnectionFactory();
}

@Bean
public Queue notificationQueue() {
    return resolveAvcQueueByJNDIName("java:comp/env/jms/name.not.important.queue");
}

单线程方法:

@Bean
public IntegrationFlow orderNotify() {
    return IntegrationFlows.from(Jms.inboundAdapter(connectionFactory()).destination(notificationQueue()),
            c -> c.poller(Pollers.fixedDelay(QUEUE_POLLING_INTERVAL_MS)
                                 .errorHandler(e -> logger.error("Can't handle incoming message", e))))
                           .handle(...).get();
}

但我想使用多个工作线程来消费消息,所以我将代码从入站适配器重构为消息驱动的通道适配器:

@Bean
public IntegrationFlow orderNotify() {
    return IntegrationFlows.from(Jms.messageDriverChannelAdapter(connectionFactory()).configureListenerContainer(c -> {
                final DefaultMessageListenerContainer container = c.get();
                container.setMaxConcurrentConsumers(notifyThreadPoolSize);
            }).destination(notificationQueue()))
                           .handle(...).get();
}

问题是应用程序在重新部署到 Tomcat 或为第二种方法重新启动时不会停止 ActiveMQ 的使用者。它在启动期间创建了新的消费者。但是所有新消息都被路由到旧的“死”消费者,因此它们位于“待处理消息”部分并且永远不会出队。

这可能是什么问题?

【问题讨论】:

    标签: java concurrency spring-integration activemq


    【解决方案1】:

    我相信你必须完全停止 Tomcat。通常在重新部署应用程序期间,应停止并正确清除 Spring 容器,但看起来情况并非如此:Tomcat 重新部署钩子缺少一些东西。因此,我建议完全停止它。

    另一种选择是忘记外部 Tomcat,而只需迁移到 Spring Boot 即可启动嵌入式 servlet 容器。这样在重建和重新启动应用程序后不会有任何泄漏。

    【讨论】:

    • 谢谢,我知道 Tomcat 的重启有助于解决这个问题,但这不是我的选择以及 Spring Boot 迁移,因为我必须修补现有项目。我正在尝试找到一个选项来重置容器或手动关闭消费者。
    • 你肯定需要完全关闭一个应用程序上下文,让它清理它已经启动的所有进程。不知道应该在 Tomcat 中配置什么才能正确地做到这一点......
    猜你喜欢
    • 2014-06-27
    • 2012-08-11
    • 1970-01-01
    • 2014-01-22
    • 2019-03-21
    • 1970-01-01
    • 2019-01-05
    • 2016-05-16
    • 1970-01-01
    相关资源
    最近更新 更多