【发布时间】:2018-03-14 03:31:49
【问题描述】:
我正在使用带有 DefaultMessageListenerContainer 的 pub/sub 模型。我已将并发消费者配置为 5。如何唯一标识每个消费者?
我正在尝试将相应侦听器处理的每个事件存储在地图中。这是我努力寻找哪个消费者处理了我的事件的部分。我该怎么做?
关键是如何在并发消费者情况下唯一地识别消费者。 我对 DMLC 的配置是
@Bean
public DefaultMessageListenerContainer listenerContainers() {
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setDestinationName(COMMENT_QUEUE);
container.setPubSubDomain(true);
container.setSessionTransacted(true);
container.setConcurrentConsumers(5);
container.setSubscriptionDurable(true);
container.setMessageListener(datafileSubscriber);
container.start();
return container;
}
如您所知,这将使 datafilesubscriber 在 5 个不同的线程中执行。 编辑 : 我听说我们需要一个带有单独客户端 ID 的连接工厂。
@Bean
public ActiveMQConnectionFactory connectionFactory(){
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(DEFAULT_BROKER_URL);
connectionFactory.setClientID("subscriber");
return connectionFactory;
}
所以现在我的问题是我应该如何处理我的 dmlc 中的 connectionFactory 参数?我应该使用相同的 connectionFactory bean 还是使用不同的客户端 ID 创建 5 个不同的。
编辑 2: 我的 2 个听众的示例代码订阅了同一个主题,并且都是持久的消费者。当消息发布到主题时,它们都接收到消息并且其中一个处理它,而另一个忽略它,因为它在缓存中看到另一个侦听器处理了它。 我的配置是
@Bean
public ActiveMQConnectionFactory connectionFactory(){
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(DEFAULT_BROKER_URL);
return connectionFactory;
}
@Bean
public DefaultMessageListenerContainer listenerContainers() {
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
//container.setConnectionFactory(connectionFactory1());
container.setClientId("consumer1");
container.setDestinationName(COMMENT_QUEUE);
container.setPubSubDomain(true);
container.setSessionTransacted(true);
container.setSubscriptionDurable(true);
container.setMessageListener(datafileSubscriber);
container.start();
return container;
}
@Bean
public DefaultMessageListenerContainer listenerContainers1() {
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setClientId("consumer2");
container.setDestinationName(COMMENT_QUEUE);
container.setPubSubDomain(true);
container.setSessionTransacted(true);
container.setSubscriptionDurable(true);
container.setMessageListener(datafileSubscriber);
container.start();
return container;
}
我的问题还是一样,由于两个侦听器执行相同,我如何识别正在处理的侦听器?
还告诉我我的配置是否正确?
【问题讨论】:
标签: java concurrency publish-subscribe spring-jms