【问题标题】:ActiveMQ Artemis and 5.x listeners at same time - NullPointerExceptionActiveMQ Artemis 和 5.x 监听器同时 - NullPointerException
【发布时间】:2020-12-30 04:19:59
【问题描述】:

我有一个旧版 Spring 4.2.1.RELEASE 应用程序,它作为侦听器连接到 ActiveMQ 5.x,现在我们正在添加到 ActiveMQ Artemis 的连接性。对于 Artemis,我们使用持久订阅,因为我们不希望在订阅者关闭和共享订阅时丢失某个主题的消息,因为我们希望选择集群或使用并发来异步处理订阅中的消息。我有单独的ConnectionFactorys 和ListenerContainers,但是从这个不断重复的WARN 日志看来,由于以下 NPE,Artemis DMLC 无法启动:

java.lang.NullPointerException
    at org.springframework.jms.listener.AbstractMessageListenerContainer.createConsumer(AbstractMessageListenerContainer.java:856)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.createListenerConsumer(AbstractPollingMessageListenerContainer.java:213)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.initResourcesIfNecessary(DefaultMessageListenerContainer.java:1173)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1149)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1142)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1039)
    at java.lang.Thread.run(Unknown Source)

表面上好像找不到方法createSharedDurableConsumer。看看我的AbstractMessageListenerContainer,第856行正在调用method.invoke

/** The JMS 2.0 Session.createSharedDurableConsumer method, if available */
private static final Method createSharedDurableConsumerMethod = ClassUtils.getMethodIfAvailable(
        Session.class, "createSharedDurableConsumer", Topic.class, String.class, String.class);

...

Method method = (isSubscriptionDurable() ?
                        createSharedDurableConsumerMethod : createSharedConsumerMethod);
try {
    return (MessageConsumer) method.invoke(session, destination, getSubscriptionName(), getMessageSelector());
}

Artemis 配置:

@Configuration
public class ArtemisConfig {

    @Autowired
    private Environment env;

    @Bean
    public ConnectionFactory artemisConnectionFactory() {
        ActiveMQConnectionFactory artemisConnectionFactory = ActiveMQJMSClient
                .createConnectionFactoryWithHA(JMSFactoryType.CF, createTransportConfigurations());

        artemisConnectionFactory.setUser(env.getRequiredProperty("artemis.username"));
        artemisConnectionFactory.setPassword(env.getRequiredProperty("artemis.password"));
        artemisConnectionFactory.setCallTimeout(env.getRequiredProperty("artemis.call.timeout.millis", Long.class));
        artemisConnectionFactory.setConnectionTTL(env.getRequiredProperty("artemis.connection.ttl.millis", Long.class));
        artemisConnectionFactory
                .setCallFailoverTimeout(env.getRequiredProperty("artemis.call.failover.timeout.millis", Long.class));
        artemisConnectionFactory.setInitialConnectAttempts(
                env.getRequiredProperty("artemis.connection.attempts.initial", Integer.class));
        artemisConnectionFactory
                .setReconnectAttempts(env.getRequiredProperty("artemis.connection.attempts.reconnect", Integer.class));
        artemisConnectionFactory.setRetryInterval(env.getRequiredProperty("artemis.retry.interval.millis", Long.class));
        artemisConnectionFactory
                .setRetryIntervalMultiplier(env.getRequiredProperty("artemis.retry.interval.multiplier", Double.class));
        artemisConnectionFactory.setBlockOnAcknowledge(true);
        artemisConnectionFactory.setBlockOnDurableSend(true);
        artemisConnectionFactory.setCacheDestinations(true);
        artemisConnectionFactory.setConsumerWindowSize(0);
        artemisConnectionFactory.setMinLargeMessageSize(1024 * 1024);

        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(artemisConnectionFactory);

        cachingConnectionFactory
        .setSessionCacheSize(env.getRequiredProperty("artemis.session.cache.size", Integer.class));
        cachingConnectionFactory.setReconnectOnException(true);

        return cachingConnectionFactory;
    }

    @Bean
    public DefaultJmsListenerContainerFactory artemisContainerFactory(ConnectionFactory artemisConnectionFactory,
            JmsTransactionManager artemisJmsTransactionManager,
            MappingJackson2MessageConverter mappingJackson2MessageConverter) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

        factory.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
        factory.setConnectionFactory(artemisConnectionFactory);
        factory.setDestinationResolver(new DynamicDestinationResolver());
        factory.setMessageConverter(mappingJackson2MessageConverter);
        factory.setSubscriptionDurable(Boolean.TRUE);
        factory.setSubscriptionShared(Boolean.TRUE);
        factory.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
        factory.setSessionTransacted(Boolean.TRUE);
        factory.setTransactionManager(artemisJmsTransactionManager);

        return factory;
    }

    private TransportConfiguration[] createTransportConfigurations() {
        String connectorFactoryFqcn = NettyConnectorFactory.class.getName();
        Map<String, Object> primaryTransportParameters = new HashMap<>(2, 1F);
        String primaryHostname = env.getRequiredProperty("artemis.primary.hostname");
        Integer primaryPort = env.getRequiredProperty("artemis.primary.port", Integer.class);

        primaryTransportParameters.put("host", primaryHostname);
        primaryTransportParameters.put("port", primaryPort);

        return new TransportConfiguration[] {
                new TransportConfiguration(connectorFactoryFqcn, primaryTransportParameters),
                new TransportConfiguration(connectorFactoryFqcn, backupTransportParameters) };
    }
}

我的 pom 使用的是 Artemis 2.10.0 版本。

我该如何解决这个问题?

【问题讨论】:

  • 在我拥有的 AbstractMessageListenerContainer 版本中,第 856 行在该方法上调用 method.invoke。使用显示此内容的代码更新问题。
  • 用我的连接工厂和 dmlc 工厂 bean 更新了问题。没有尝试过更新的 Spring 版本 - 这是一个遗留应用程序,所以我们试图避免这样做。
  • 抱歉 - 我们所做的更改是添加一个新的 Artemis 监听器。那是行不通的。
  • 持久,因为我们不希望在订阅者下降时丢失主题的消息。共享是因为我们希望选择集群或使用并发来异步处理(尽管如果解决了这个问题,我们可能会逃脱)
  • 检查 ConnectionFactory 被传递到我的 dmlc 工厂 bean 我看到:[org.apache.geronimo.specs:geronimo-jms_1.1_spec:1.1.1]。不确定这是否意味着 2.0 不在类路径中。我不记得需要为我的 pom 添加特定的依赖项以获取 jms 规范,但也许这就是我所缺少的?

标签: java activemq spring-jms activemq-artemis


【解决方案1】:

JMS 2.0 规范向后兼容 JMS 1.1,因此请确保您的类路径中只有 JMS 2 规范。我的预感是 Spring 代码中的反射调用变得一团糟,因为它们使用的是 JMS 1.1 规范类而不是正确的 JMS 2 规范类。

【讨论】:

  • ActiveMQ DMLC 连接正常,这是我的 Artemis 之一(或我打算成为我的 Artemis 的那个)不起作用。检查 DefaultJmsListenerContainerFactory,我看起来它是通过 spring-jms 4.2.1.RELEASE 拉进来的
  • 这可能与在 spring jms 之上同时具有 artemis-jms-client 和 activemq-core 的依赖关系有关吗?不确定是否使用了错误的 DMLC 实现。在这一点上,我正在抓住稻草。
  • 我不明白拥有这些依赖项会有什么问题。
  • 将 javax.jms-api 显式添加到我的 pom 并从我的 pom 中的 activemq-pool 和 activemq-core 中排除 geronimo-jms_1.1_spec 解决了这个问题。接受的答案。谢谢!
  • 谢谢,这是救命稻草 :)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2013-12-26
  • 2012-07-23
  • 2019-10-06
  • 1970-01-01
  • 2012-09-27
  • 1970-01-01
  • 2022-08-10
相关资源
最近更新 更多