【问题标题】:Spring JmsListener not picking up messagesSpring JmsListener 不接收消息
【发布时间】:2021-03-09 19:29:14
【问题描述】:

我有一个 Spring Boot 2.1.4 应用程序,它使用 DefaultJmsListenerContainerFactory 通过 SSL 使用来自 IBM MQ 9.0 的消息。我注意到在某些未知情况下,侦听器与 MQ 的连接仍然存在,并且在日志记录和 MQ 资源管理器中报告为已连接,但是侦听器不使用消息,并且它们保留在队列中,直到 Spring Boot 应用程序重新启动。日志不会在应用程序或队列管理器中报告任何内容。

理想情况下,我希望JmsListener 能够识别失败状态并自行重新连接。我已经阅读了在 MQ 上配置 Heartbeat 或 Keep Alive 并且客户端会有所帮助。 MQ 通道设置为 5 秒的心跳间隔,但不确定如何在 Spring Boot 应用中配置此设置或保持活动状态。

注意:如果 MQ 连接发生异常,我的 JmsListener 会自行重新连接。

Jms 工厂代码:

@Bean
public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory,
        DefaultJmsListenerContainerFactoryConfigurer configurer) {
    DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory = new DefaultJmsListenerContainerFactory();
    defaultJmsListenerContainerFactory.setTransactionManager(jmsTransactionManager(connectionFactory));
    defaultJmsListenerContainerFactory.setSessionTransacted(true);
    defaultJmsListenerContainerFactory.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
    defaultJmsListenerContainerFactory.setErrorHandler(new DefaultJmsErrorHandler());
    defaultJmsListenerContainerFactory.setCacheLevel(DefaultMessageListenerContainer.CACHE_NONE);
    configurer.configure(defaultJmsListenerContainerFactory, connectionFactory);
    return defaultJmsListenerContainerFactory;
}

听众:

@JmsListener(destination = "${sys.to.app.core.queue}", containerFactory = "myFactory")
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void onMessageReceived(final Message message) throws InboundQueueMessageException {...}
    

【问题讨论】:

  • 您能分享任何您认为可以帮助我们的代码吗?
  • 除非您在 spring 端使用 CCDT,否则 HBINT 将协商 SVRCONN 的值。但是根据您的描述,听起来您并没有断开连接。您使用的是哪个版本的 IBM MQ jar 文件?队列管理器的完整版是什么?
  • 可能是 ibm.mq.pool.timeBetweenExpirationCheck 。见github.com/ibm-messaging/mq-jms-spring
  • 在一些较旧的维护级别中存在一些死锁。
  • @JoshMc IBM MQ jar 9.0.4,Spring JMS jar:5.1.6,IBM MQ jar:9.0.2,IBM MQ Manager:9.1.0.5

标签: java ibm-mq spring-jms


【解决方案1】:

理想情况下,它应该是负责连接失败并重新连接的底层管道,但如果您必须这样做,那么这可能会有所帮助。

假设您有一个侦听器设置,例如:

    @JmsListener(destination = "${queue.name1}",
            containerFactory = "myListenerFactory",
            id = "Q1Object")
    public void receiveOO(MyMessageData data) {
       ...
    }

然后您可以为侦听器工厂注册一个错误处理程序。您可以在其中检查侦听器是否正在运行,如果没有,请重新启动它。



import lombok.SneakyThrows;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.*;
import org.springframework.jms.listener.MessageListenerContainer;
import org.springframework.util.ErrorHandler;

import javax.jms.ConnectionFactory;


@EnableJms
@Configuration
public class MQConfigurationAndHandler implements ErrorHandler {

    protected final Log logger = LogFactory.getLog(getClass());

    private final ConnectionFactory connectionFactory;

    // The listener registry allows us to control the @JmsListener endpoints
    private final JmsListenerEndpointRegistry registry;

    public MQConfigurationAndHandler(ConnectionFactory connectionFactory,
                                     JmsListenerEndpointRegistry registry) {
        this.connectionFactory = connectionFactory;
        this.registry = registry;
    }


    @SneakyThrows
    @Override
    public void handleError(Throwable e) {
        logger.warn("Listener error has been thrown");
        logger.warn(e.getMessage());

//        logger.info("Registered listeners IDs are : ");
//        for (String listenerId : registry.getListenerContainerIds()) {
//            logger.info("ID " + listenerId);
//        }

        logger.info("Checking listener");
        MessageListenerContainer mlc = registry.getListenerContainer("Q1Object");
        if (! mlc.isRunning()) {
            logger.warn("Listener has stopped running attempting restart");
            mlc.start();
        } else {
            logger.warn("Listener is running, something else is throwing an error");
            throw e;
        }
    }

    @Bean("myListenerFactory")
    public DefaultJmsListenerContainerFactory myCustomisedListenerFactory() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setErrorHandler(this);

        return factory;
    }

}

【讨论】:

  • 我的 JMS 侦听器正在成功重新连接任何捕获的异常(使用框架提供功能)。不幸的是,我正在处理 JMS 未检测到的“静默异常”。
  • 这将是底层异常监听器
猜你喜欢
  • 1970-01-01
  • 2017-07-29
  • 1970-01-01
  • 2019-07-30
  • 2017-02-15
  • 2020-01-04
  • 2020-10-02
  • 2016-03-27
  • 2011-12-12
相关资源
最近更新 更多