【问题标题】:Spring Integration: How to use SingleConnectionFactory with ActiveMQ?Spring 集成:如何将 SingleConnectionFactory 与 ActiveMQ 一起使用?
【发布时间】:2020-11-17 02:26:43
【问题描述】:

Spring Boot 2.3.1.RELEASE。

使用spring.jms.cache.enabled=true(默认),Spring 创建一个CachingConnectionFactory

    @ConditionalOnProperty(prefix = "spring.jms.cache", name = "enabled", havingValue = "true",
            matchIfMissing = true)
    static class CachingConnectionFactoryConfiguration {

这很糟糕,因为it shouldn't be used with DefaultMessageListenerContainer。我认为这就是为什么我的某些消息会“丢失”直到它们突然重新出现的原因。

使用spring.jms.cache.enabled=false,Spring 创建一个ActiveMQConnectionFactory

    @ConditionalOnProperty(prefix = "spring.jms.cache", name = "enabled", havingValue = "false")
    ActiveMQConnectionFactory jmsConnectionFactory(ActiveMQProperties properties,
            ObjectProvider<ActiveMQConnectionFactoryCustomizer> factoryCustomizers) {
        return createJmsConnectionFactory(properties, factoryCustomizers);
    }

    private static ActiveMQConnectionFactory createJmsConnectionFactory(ActiveMQProperties properties,

这很糟糕,因为每次轮询都会创建一个到代理的新连接——数百个连接淹没了我的代理。

虽然我的问题的解决方案是使用SingleConnectionFactory。在AbstractPollingMessageListenerContainer.MessageListenerContainerResourceFactory我看到了:

    public Connection createConnection() throws JMSException {
        if (AbstractPollingMessageListenerContainer.this.sharedConnectionEnabled()) {
            Connection sharedCon = AbstractPollingMessageListenerContainer.this.getSharedConnection();
            return new SingleConnectionFactory(sharedCon).createConnection();
        }

所以我想我会:

Jms.channel(connectionFactory)
    .cacheLevel(DefaultMessageListenerContainer.CACHE_CONNECTION)

但事实证明,这个方法永远不会被调用,只有JmsAccessor.createConnection() 会创建一个ActiveMQConnectionFactory。我的缓存级别无效。

那么如何正确使用SingleConnectionFactory呢?

【问题讨论】:

    标签: spring spring-integration activemq spring-jms


    【解决方案1】:

    如果您有可变并发,缓存工厂只是 DMLC 的问题。

    只需将SingleConnctionFactory 定义为@Bean 并使用Jms.channel(mySingleCF())...

    编辑

    @SpringBootApplication
    public class So63120705Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So63120705Application.class, args).close(); // JVM should exit
        }
    
        @Bean
        public ApplicationRunner runner(ConnectionFactory jmsConnectionFactory, IntegrationFlowContext context,
                JmsTemplate template) {
    
            return args -> {
                SingleConnectionFactory connectionFactory = new SingleConnectionFactory(jmsConnectionFactory);
    
                IntegrationFlow flow = f -> f.channel(Jms.channel(connectionFactory)
                                    .destination("foo"))
                            .handle(System.out::println);
    
                context.registration(flow)
                    .id("jms")
                    .addBean(connectionFactory)
                    .register();
    
                template.convertAndSend("foo", "test");
                Thread.sleep(5_000);
            };
        }
    
    }
    
    spring.jms.cache.enabled=false
    

    【讨论】:

    • 刚刚测试过,它按预期工作。可能您对Jms.channel() 的性质感到困惑。与应用了CACHE_CONNECTIONDefaultMessageListenerContainer 容器一起,它还有一个JmsTemplate 部分,用于将消息发送到JMS。而这个确实在工厂的互动中创造了一种新的联系。
    • 好吧,用这个Jms.channel(new SingleConnectionFactory(connectionFactory)) 进行了测试,看起来目标ActiveMQConnectionFactory.createConnection() 真的只被调用了一次:Mockito.verify(this.connectionFactory).createConnection()
    • 非常感谢您的支持,加里。我尝试按照您的建议进行操作,但我不确定如何正确地将 SingleConnectionFactory 作为 bean,而不必创建自己的 ActiveMQConnectionFactory 并自己完成所有属性绑定。与此同时,我试了一下spring.activemq.pool.enable=true(和org.messaginghub:pooled-jms),它奏效了。一旦时间压力消失,我会再试一次你的解决方案:)
    • 没错;只要你定义了ConnectionFactory 类型的任何bean,你就可以独立使用@ConditionalOnMissingBean(ConnectionFactory.class)。不幸的是,虽然 Artem 的建议可以解决这个问题,但它会导致另一个问题,因为它不是 @Bean 并且在上下文关闭且 JVM 不退出时永远不会调用 destroy()。一种解决方案是动态注册流程,这允许您将其添加为 bean - 请参阅我的答案的编辑。
    猜你喜欢
    • 2016-01-23
    • 2016-05-08
    • 1970-01-01
    • 2011-02-23
    • 1970-01-01
    • 1970-01-01
    • 2014-03-07
    • 2015-12-04
    • 2015-03-14
    相关资源
    最近更新 更多