【发布时间】:2019-09-04 10:01:35
【问题描述】:
您好,我正在使用 ActiveMQ 学习 Spring JMS。 在我的示例场景中,生产者应用程序在队列中发送大约 50 条消息,当我启动消费者应用程序时,它开始使用这些消息。
现在我希望多个消费者线程使用队列中的消息。 我正在使用 JMS 侦听器容器。当我用谷歌搜索时,我发现有一个 concurrency 属性。
根据Spring JMS doc并发属性指定
为每个侦听器启动的并发会话/消费者数。可以是表示最大数量的简单数字(例如“5”),也可以是表示下限和上限的范围(例如“3-5”)。请注意,指定的最小值只是一个提示,在运行时可能会被忽略。默认为 1;在主题侦听器或队列排序很重要的情况下,将并发限制为 1;考虑为一般队列提高它。
但在我的配置中,我将此属性设置为 5,但似乎无法启动 5 个并发侦听器。
监听器配置:
consumer-applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" p:brokerURL="tcp://localhost:61616" />
<bean id="listener" class="com.jms.example.MyMessageListener"></bean>
<jms:listener-container container-type="default" concurrency="5"
connection-factory="connectionFactory">
<jms:listener destination="MyQueue" ref="listener"
method="onMessage"></jms:listener>
</jms:listener-container>
</beans>
如果我使用带有属性的 bean DefaultMessageListenerContainer 而不是 jms:listener-container:
<bean id="msgListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer"
p:connectionFactory-ref="connectionFactory"
p:destination-ref="destination"
p:messageListener-ref="listener"
p:concurrentConsumers="10"
p:maxConcurrentConsumers="50" />
然后在 ActiveMQ 控制台中,我可以看到 10 个消费者,但实际上它同时启动了 3 个消费者,有时是 6 个或只有 1 个消费者。
编辑:
消费者代码:
public class MyMessageListener implements MessageListener{
public void onMessage(Message m) {
TextMessage message=(TextMessage)m;
try{
System.out.println("Start = " + message.getText());
Thread.sleep(5000);
System.out.println("End = " + message.getText());
}catch (Exception e) {e.printStackTrace(); }
}
}
我正在控制台上打印消费消息,其输出在以下场景中解释:
观察:
我观察到一些奇怪的行为。 我的生产者和消费者是两个独立的应用程序。
场景 - 1:
- 我启动生产者并发送消息(同时消费者没有运行)
- 然后我开始消费者消费消息。
这里的问题是它没有加载所有 10 个消费者。有时它会加载 3 OR 1。
Start = hello jms 1 // consumer 1 started
Start = hello jms 2 // consumer 2 started
Start = hello jms 3 // consumer 3 started
End = hello jms 1 // consumer 1 ended
Start = hello jms 4 // consumer 4 started and hence always 3 consumers and not 10
End = hello jms 2
Start = hello jms 5
End = hello jms 3
Start = hello jms 6
场景 - 2:
- 我启动生产者并发送消息(同时消费者正在运行)
- 由于消费者处于运行状态,它开始消费它们。
所以它确实按预期正确加载了所有 5 个消费者。所以输出是:
Start = hello jms 1 // consumer 1 started
Start = hello jms 2 // consumer 2 started
Start = hello jms 3 // consumer 3 started
Start = hello jms 4 // consumer 4 started
Start = hello jms 5 // consumer 5 started
Start = hello jms 6 // consumer 6 started
Start = hello jms 7 // consumer 7 started
Start = hello jms 8 // consumer 8 started
Start = hello jms 9 // consumer 9 started
Start = hello jms 10 // consumer 10 started. Hence all them started at same time as expected.
End = hello jms 1
Start = hello jms 11
End = hello jms 2
Start = hello jms 12
End = hello jms 3
Start = hello jms 13
为什么会这样。它真的在吃我的大脑。 我不想让消费者永远运行。我想保持两者分离。
请帮忙。
【问题讨论】:
-
“实际上它同时启动 3 个消费者”是什么意思?如果你可以在 AMQ 控制台看到 10 个消费者,那么你就有 10 个消费者。试着随着时间的推移给它稳定的负载,你会注意到所有 10 个消费者都会得到负载。
-
@PetterNordlander 我将更新我的问题以澄清问题。
-
@PetterNordlander 请查看我的编辑。我的意思是在我的消费者代码中说我正在打印收到的消息文本。所以从输出来看,它似乎同时启动了 3 个消费者,而不是 10 个。
-
@PetterNordlander 请参阅我在上述问题中的观察。我已经更新了我的问题。
-
@mahendra 如果您只指定一个数字,则意味着将启动“最大”数量的消费者。这意味着总是小于“最大值”启动,除非 Spring JMS 认为它需要更多。尝试指定一个具有最小值的范围,并为您的消费者提供稳定的负载(而不是 50 条消息)。您会看到所有消费者的工作量都达到了指定的最大值。
标签: java spring concurrency activemq spring-jms