【发布时间】:2019-04-26 01:55:07
【问题描述】:
我正在尝试使用 solace 作为代理设置带有 jms 的主题端点的最大消费者计数,因此为了增加负载,可以在 cloudfoundry 中启动应用程序的多个实例,并且多个订阅者可以消费同一主题的消息.
我尝试了以下设置的多种组合(setConcurrency(), setConcurrentConsumers(), setMaxConcurrentConsumers(),(20 作为任意高数字)。从文档来看,我肯定需要使用 setMaxConcurrentConsumers() 并将其设置为适当的高值。
当我部署应用程序时,会创建主题端点,但是当我查看 solace 管理界面时,最大消费者计数始终为 1(可以在此处看到:Queues -> Topic Endpoints -> select endpoint -> Configured Limit),即使它应该是 20 . 所以第二个消费者是无法连接的。我不想每次部署应用时都手动设置。
import javax.jms.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
@Configuration
public class ProducerConfiguration {
private static final Log logger = LogFactory.getLog(SolaceController.class);
@Value("${durable_subscription}")
private String subscriptionName;
@Value("${topic_name}")
private String topic_name;
@Autowired
private ConnectionFactory connectionFactory;
@Bean
public JmsTemplate jmsTemplate() {
CachingConnectionFactory ccf = new CachingConnectionFactory(connectionFactory);
JmsTemplate jmst = new JmsTemplate(ccf);
jmst.setPubSubDomain(true);
return jmst;
}
@Bean
public Session configureSession(ConnectionFactory connectionFactory) throws JMSException {
return connectionFactory.createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
}
private TextMessage lastReceivedMessage;
public class SimpleMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
lastReceivedMessage = (TextMessage) message;
try {
logger.info("Received message : " + lastReceivedMessage.getText());
} catch (JMSException e) {
logger.error("Error getting text of the received TextMessage: " + e);
}
} else {
logger.error("Received message that was not a TextMessage: " + message);
}
}
}
@Bean
public DefaultMessageListenerContainer orderMessageListenerContainer() {
DefaultMessageListenerContainer lc = new DefaultMessageListenerContainer();
lc.setConnectionFactory(connectionFactory);
lc.setDestinationName(topic_name);
lc.setMessageListener(new SimpleMessageListener());
lc.setDurableSubscriptionName(subscriptionName);
lc.setPubSubDomain(true);
//tried multiple combinations here, also setting only setMaxConcurrentConsumers
lc.setConcurrency("2-20");
lc.setConcurrentConsumers(20);
lc.setMaxConcurrentConsumers(20);
lc.setSubscriptionDurable(true);
lc.initialize();
lc.start();
return lc;
}
}
【问题讨论】:
-
如果你注释掉 setSubscriptionDurable(true) 会改变吗?也许您总是使用相同的订阅名称,而 Solace 不喜欢这样?
-
刚刚试了一下。在两个应用程序实例的情况下,创建了四个主题端点(同一个主题),这对我来说很好。在日志输出中也没有订阅者无法连接的错误。当然,所有这些都是非持久的,但我需要持久订阅,绝对没有办法解决它。不得丢失任何消息。