【问题标题】:Concurrent Consumers in JMS publisher/ Subscriber modelJMS 发布者/订阅者模型中的并发消费者
【发布时间】:2018-03-14 03:31:49
【问题描述】:

我正在使用带有 DefaultMessageListenerContainer 的 pub/sub 模型。我已将并发消费者配置为 5。如何唯一标识每个消费者?

我正在尝试将相应侦听器处理的每个事件存储在地图中。这是我努力寻找哪个消费者处理了我的事件的部分。我该怎么做?

关键是如何在并发消费者情况下唯一地识别消费者。 我对 DMLC 的配置是

@Bean
    public DefaultMessageListenerContainer listenerContainers() {
        DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
        container.setConnectionFactory(connectionFactory());
        container.setDestinationName(COMMENT_QUEUE);
        container.setPubSubDomain(true);
        container.setSessionTransacted(true);
        container.setConcurrentConsumers(5);
        container.setSubscriptionDurable(true);
        container.setMessageListener(datafileSubscriber);
        container.start();
        return container;
    } 

如您所知,这将使 datafilesubscriber 在 5 个不同的线程中执行。 编辑 : 我听说我们需要一个带有单独客户端 ID 的连接工厂。

  @Bean
        public ActiveMQConnectionFactory connectionFactory(){
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
            connectionFactory.setBrokerURL(DEFAULT_BROKER_URL);
            connectionFactory.setClientID("subscriber");
            return connectionFactory;
        }

所以现在我的问题是我应该如何处理我的 dmlc 中的 connectionFactory 参数?我应该使用相同的 connectionFactory bean 还是使用不同的客户端 ID 创建 5 个不同的。

编辑 2: 我的 2 个听众的示例代码订阅了同一个主题,并且都是持久的消费者。当消息发布到主题时,它们都接收到消息并且其中一个处理它,而另一个忽略它,因为它在缓存中看到另一个侦听器处理了它。 我的配置是

@Bean
    public ActiveMQConnectionFactory connectionFactory(){
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL(DEFAULT_BROKER_URL);
        return connectionFactory;
    }
@Bean
    public DefaultMessageListenerContainer listenerContainers() {
        DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
        container.setConnectionFactory(connectionFactory());
        //container.setConnectionFactory(connectionFactory1());
        container.setClientId("consumer1");
        container.setDestinationName(COMMENT_QUEUE);
        container.setPubSubDomain(true);
        container.setSessionTransacted(true);
        container.setSubscriptionDurable(true);
        container.setMessageListener(datafileSubscriber);
        container.start();
        return container;
    }

    @Bean
    public DefaultMessageListenerContainer listenerContainers1() {
        DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
        container.setConnectionFactory(connectionFactory());
        container.setClientId("consumer2");
        container.setDestinationName(COMMENT_QUEUE);
        container.setPubSubDomain(true);
        container.setSessionTransacted(true);
        container.setSubscriptionDurable(true);
        container.setMessageListener(datafileSubscriber);
        container.start();
        return container;
    } 

我的问题还是一样,由于两个侦听器执行相同,我如何识别正在处理的侦听器?

还告诉我我的配置是否正确?

【问题讨论】:

    标签: java concurrency publish-subscribe spring-jms


    【解决方案1】:

    对主题使用并发没有多大意义,因为消息侦听器将收到 5 次相同的消息。

    如果这真的是你想要的,你需要 5 个并发=1 的容器。

    标准 JMS 不支持在主题上竞争的消费者(一些代理对此有扩展,但您必须参考代理的文档)。

    【讨论】:

    • 其实我想让那些消费者耐用。我的配置是否有效?您能否建议一种方式,让所有 5 个消费者都收到消息并且它们是持久的并且我必须能够识别它们?
    • 你能看到 EDIT 部分并澄清所需的 ConnectionFactory 吗?
    • 只要你使用这样一个简单的ConnectionFactory,你就可以在容器上而不是在工厂上设置clientId。或者你可以使用 5 个 CF。
    • 嘿 Gary,看看编辑 2。请回答问题。
    • 您需要为每个单独的侦听器实例,并使用客户端 ID 对其进行配置。也就是说,这个用例对我来说似乎很奇怪。目前尚不清楚您要达到的目标。
    猜你喜欢
    • 2017-07-17
    • 2017-02-20
    • 2015-06-10
    • 2013-08-04
    • 1970-01-01
    • 2016-06-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多