【问题标题】:Spring JMS listener-container concurrency attribute not workingSpring JMS 侦听器容器并发属性不起作用
【发布时间】:2019-09-04 10:01:35
【问题描述】:

您好,我正在使用 ActiveMQ 学习 Spring JMS。 在我的示例场景中,生产者应用程序在队列中发送大约 50 条消息,当我启动消费者应用程序时,它开始使用这些消息。

现在我希望多个消费者线程使用队列中的消息。 我正在使用 JMS 侦听器容器。当我用谷歌搜索时,我发现有一个 concurrency 属性。

根据Spring JMS doc并发属性指定

为每个侦听器启动的并发会话/消费者数。可以是表示最大数量的简单数字(例如“5”),也可以是表示下限和上限的范围(例如“3-5”)。请注意,指定的最小值只是一个提示,在运行时可能会被忽略。默认为 1;在主题侦听器或队列排序很重要的情况下,将并发限制为 1;考虑为一般队列提高它。

但在我的配置中,我将此属性设置为 5,但似乎无法启动 5 个并发侦听器。

监听器配置:

consumer-applicationContext.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms"

    xmlns:p="http://www.springframework.org/schema/p"
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/jms
    http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">

    <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" p:brokerURL="tcp://localhost:61616" />

    <bean id="listener" class="com.jms.example.MyMessageListener"></bean>

    <jms:listener-container container-type="default" concurrency="5"
        connection-factory="connectionFactory">
        <jms:listener destination="MyQueue" ref="listener"
            method="onMessage"></jms:listener>
    </jms:listener-container>

</beans>

如果我使用带有属性的 bean DefaultMessageListenerContainer 而不是 jms:listener-container

<bean id="msgListenerContainer" 
        class="org.springframework.jms.listener.DefaultMessageListenerContainer"
        p:connectionFactory-ref="connectionFactory"
        p:destination-ref="destination"
        p:messageListener-ref="listener"
        p:concurrentConsumers="10"
        p:maxConcurrentConsumers="50" />

然后在 ActiveMQ 控制台中,我可以看到 10 个消费者,但实际上它同时启动了 3 个消费者,有时是 6 个或只有 1 个消费者。

编辑:

消费者代码:

public class MyMessageListener implements MessageListener{


    public void onMessage(Message m) {
        TextMessage message=(TextMessage)m;
        try{
            System.out.println("Start = " + message.getText());
            Thread.sleep(5000);
            System.out.println("End = " + message.getText());
        }catch (Exception e) {e.printStackTrace();  }
    }
}

我正在控制台上打印消费消息,其输出在以下场景中解释:

观察:

我观察到一些奇怪的行为。 我的生产者和消费者是两个独立的应用程序。

场景 - 1:

  1. 我启动生产者并发送消息(同时消费者没有运行)
  2. 然后我开始消费者消费消息。

这里的问题是它没有加载所有 10 个消费者。有时它会加载 3 OR 1。

Start = hello jms 1 // consumer 1 started 
Start = hello jms 2 // consumer 2 started 
Start = hello jms 3 // consumer 3 started 
End = hello jms 1  //  consumer 1 ended
Start = hello jms 4 // consumer 4 started and hence always 3 consumers and not 10
End = hello jms 2
Start = hello jms 5
End = hello jms 3
Start = hello jms 6

场景 - 2:

  1. 我启动生产者并发送消息(同时消费者正在运行)
  2. 由于消费者处于运行状态,它开始消费它们。

所以它确实按预期正确加载了所有 5 个消费者。所以输出是:

Start = hello jms 1 // consumer 1 started 
Start = hello jms 2 // consumer 2 started 
Start = hello jms 3 // consumer 3 started 
Start = hello jms 4 // consumer 4 started 
Start = hello jms 5 // consumer 5 started 
Start = hello jms 6 // consumer 6 started 
Start = hello jms 7 // consumer 7 started 
Start = hello jms 8 // consumer 8 started 
Start = hello jms 9 // consumer 9 started 
Start = hello jms 10 // consumer 10 started. Hence all them started at same time as expected.
End = hello jms 1
Start = hello jms 11
End = hello jms 2
Start = hello jms 12
End = hello jms 3
Start = hello jms 13

为什么会这样。它真的在吃我的大脑。 我不想让消费者永远运行。我想保持两者分离。

请帮忙。

【问题讨论】:

  • “实际上它同时启动 3 个消费者”是什么意思?如果你可以在 AMQ 控制台看到 10 个消费者,那么你就有 10 个消费者。试着随着时间的推移给它稳定的负载,你会注意到所有 10 个消费者都会得到负载。
  • @PetterNordlander 我将更新我的问题以澄清问题。
  • @PetterNordlander 请查看我的编辑。我的意思是在我的消费者代码中说我正在打印收到的消息文本。所以从输出来看,它似乎同时启动了 3 个消费者,而不是 10 个。
  • @PetterNordlander 请参阅我在上述问题中的观察。我已经更新了我的问题。
  • @mahendra 如果您只指定一个数字,则意味着将启动“最大”数量的消费者。这意味着总是小于“最大值”启动,除非 Spring JMS 认为它需要更多。尝试指定一个具有最小值的范围,并为您的消费者提供稳定的负载(而不是 50 条消息)。您会看到所有消费者的工作量都达到了指定的最大值。

标签: java spring concurrency activemq spring-jms


【解决方案1】:

正如 Strelok 向我指出的有关预取消息的内容。创建 prefetchPolicy bean 并将 queuePrefetch 属性设置为 1。 在 connectionFactory 中设置了谁的引用。

我对配置做了一些更改,如下:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms"

    xmlns:p="http://www.springframework.org/schema/p"
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/jms
    http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">

    <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" p:brokerURL="tcp://localhost:61616"
        p:prefetchPolicy-ref="prefetchPolicy" />

    <bean id="prefetchPolicy" class="org.apache.activemq.ActiveMQPrefetchPolicy"
        p:queuePrefetch="1" />

    <bean id="listener" class="com.javatpoint.MyMessageListener"></bean>

    <jms:listener-container concurrency="10-15" connection-factory="connectionFactory">
        <jms:listener destination="javatpointQueue" ref="listener"
            method="onMessage"></jms:listener>
    </jms:listener-container>

    <!-- The JMS destination -->

      <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="javatpointQueue" />
      </bean>
</beans>

【讨论】:

  • @Gary Russell 你能告诉我什么时候使用 spring jmsTemplate 以及什么时候使用 Spring Integration JMS 吗?如果我想用 Spring Integraion JMS 替换我现有的配置,那么我需要注意什么?
【解决方案2】:

刚刚在 spring-boot 1.5.9 应用上遇到了这个问题。

正如@Strelok 和@mahendra kawde 所指出的,问题是由于prefetchPolicy 参数。默认值为 1000。

建议使用较大的预取值以获得高性能和高消息量。但是,对于较低的消息量,每条消息需要很长时间来处理,预取应该设置为 1。这可以确保消费者一次只处理一条消息。但是,将预取限制指定为零将导致消费者轮询消息,一次一条,而不是消息被推送给消费者。

看看http://activemq.apache.org/what-is-the-prefetch-limit-for.html

可以改变prefetchPolicy参数如下:

  1. application.properties 文件中 (working example)

    spring.activemq.broker-url=tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1
    
  2. 在DefaultMessageListenerContainer中通过修改destinationName参数(working example

    <bean id="cons-even" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
      <property name="destinationName" value="queue-name?consumer.prefetchSize=1"/>
      ...
    </bean>
    
  3. 在 ConnectionFactory bean (working example) 中:

    @Bean
    public ConnectionFactory jmsConnectionFactory() {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
        ActiveMQPrefetchPolicy policy = new ActiveMQPrefetchPolicy();
        policy.setQueuePrefetch(1);
        factory.setPrefetchPolicy(policy);
        return factory;
    }
    

相关主题:

  1. How do I make Spring JMSListener burst to max concurrent threads?
  2. Dynamic scaling of JMS consumer in spring boot

【讨论】:

    【解决方案3】:

    JMS 可以在并发模式下工作。下面我分享示例 sn-p concurrentConsumers = 100 个值

    Spring JMS Documentation

    <bean id="listenerContainer"        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="concurrentConsumers">
                <value>100</value>
            </property>
            <property name="connectionFactory" ref="connectionFactory" />
            <property name="destination" ref="queue" />
            <property name="messageListener" ref="messageListener" />
            <property name="sessionTransacted" value="false" />
            <property name="sessionAcknowledgeMode" value="1" />
        </bean>
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-02-17
      • 2015-05-19
      • 1970-01-01
      • 2016-10-10
      • 2015-05-27
      • 2020-01-07
      相关资源
      最近更新 更多