【问题标题】:Active mq consumers count decreasing to 0Activemq 消费者数量减少到 0
【发布时间】:2018-04-27 02:25:42
【问题描述】:

我们面临一个奇怪的问题,其中一些随机队列的 activemq 消费者正在减少,直到它们变为 0,之后它们就无法恢复。 一旦发生这种情况,我们必须再次重新部署消费者应用程序以开始处理。 我们一直在努力解决这个问题,但无法找出根本原因。

activemq 代理版本 5.14.5

以下是连接配置。

<bean id="activeMQIconnectConnectionFactory" class="test.ActiveMQIconnectConnectionFactory">
        <property name="brokerURL" value="failover:(tcp://localhost:61616)"/>
        <property name="prefetchPolicy" ref="prefetchPolicy"/>
        <property name="redeliveryPolicy" ref="redeliveryPolicy"/>
        <property name="trustAllPackages" value="true"/>
        <!-- http://activemq.apache.org/consumer-dispatch-async.html
        The default setting is dispatchAsync=true
         If you want better thoughput and the chances of having a slow consumer are low, you may want to change this to false.
         -->
        <property name="dispatchAsync" value="true"/>
        <!-- 
        whether or not timestamps on messages should be disabled or not. If you disable them it adds a small performance boost.
         Default is false
         -->
        <property name="disableTimeStampsByDefault" value="true"/>

        <!-- http://activemq.apache.org/optimized-acknowledgement.html
        This option is disabled by default but can be used to improve throughput in some circumstances as it decreases load on the broker.
         -->
        <property name="optimizeAcknowledge" value="true"/>
        <!-- Default 300ms
        For us, 5 sec.
         -->
        <property name="optimizeAcknowledgeTimeOut" value="5000"/>
        <property name="useAsyncSend" value="true"/>
        <property name="exceptionListener" ref="jmsExceptionListener"/>
    </bean>

 <bean id="testQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg index="0" value="test.queue"/>
    </bean>

    <bean id="jmsProducerFactoryPool" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"
          init-method="start">
        <property name="connectionFactory" ref="activeMQIconnectConnectionFactory"/>
        <property name="maxConnections" value="10"/>
        <property name="maximumActiveSessionPerConnection"
                  value="1000"/>
        <property name="createConnectionOnStartup" value="true"/>
    </bean>

    <bean id="jmsConsumerFactoryPool" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"
          init-method="start">
        <property name="connectionFactory" ref="activeMQIconnectConnectionFactory"/>
        <property name="maxConnections" value="10"/>
        <property name="maximumActiveSessionPerConnection"               value="1000"/>
        <property name="createConnectionOnStartup" value="true"/>
        <property name="reconnectOnException" value="true"/>
        <property name="idleTimeout" value="86400000"/>
    </bean>

    <bean id="redeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
        <property name="maximumRedeliveries" value="1"/>
        <property name="queue" value="*"/>
    </bean>

    <bean id="prefetchPolicy" class="org.apache.activemq.ActiveMQPrefetchPolicy">
        <property name="queuePrefetch" value="500"/>
    </bean>

    <bean id="jmsTemplate" class="com.minda.iconnect.jms.impl.TimedJmsTemplate">
        <property name="connectionFactory" ref="jmsProducerFactoryPool"/>
        <property name="defaultDestinationName" value="iconnect.queue"/>
        <property name="deliveryPersistent" value="true"/>
        <!-- I think this is ingored if explicitQosEnabled is not set -->
    </bean>

    <bean id="simpleMessageConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter"/>

<bean id="testProducer"
          class="com.test.TestProducer">
        <property name="consumerDestination" ref="testQueu"/>
        <property name="jmsTemplate" ref="jmsTemplate"/>
        <property name="messageConverter" ref="simpleMessageConverter"/>
    </bean>

    <bean id="testContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="jmsConsumerFactoryPool"/>
        <property name="destination" ref="testS"/>
        <property name="messageListener">
            <bean class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
                <property name="delegate" ref="testConsumer"/>
                <property name="defaultListenerMethod" value="process"/>
                <property name="messageConverter" ref="simpleMessageConverter"/>
            </bean>
        </property>
        <property name="concurrentConsumers" value="50"/>
        <property name="maxConcurrentConsumers" value="100"/>
        <property name="sessionTransacted" value="false"/>
        <property name="autoStartup" value="true"/>
    </bean>
</beans>

connectionFactory 类

public class ActiveMQIconnectConnectionFactory extends org.apache.activemq.ActiveMQConnectionFactory
{
    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQIconnectConnectionFactory.class);

    @Override
    public void setBrokerURL(String brokerURL)
    {
        // LOG when connecting to activemq
        // using this setter to be sure it's only logged once
        // See DJ-5780
        LOG.info("ActiveMQ configured is: " + (DEFAULT_BROKER_URL.equals(brokerURL) ? "(default init setting) " : "") + brokerURL);
        LOG.info("Connecting to ActiveMQ");
        super.setBrokerURL(brokerURL);
    }
}

到目前为止,我们一直在考虑超时等参数,但不是运气。 我们怀疑问题是由于某些连接问题或通过 DMLC 处理连接而导致的,但无法确定问题。非常感谢您的帮助!

【问题讨论】:

    标签: spring jms activemq


    【解决方案1】:

    我认为您的问题是基于您的配置相互影响的 Spring DMLC 和 AMQ 行为的混合。

    尝试改变:

    <property name="optimizeAcknowledgeTimeOut" value="500"/>
    

    org.springframework.jms.listener.DefaultMessageListenerContainer.setReceiveTimeout(0);

    或者

    org.springframework.jms.listener.DefaultMessageListenerContainer.setReceiveTimeout(10000);
    org.springframework.jms.listener.DefaultMessageListenerContainer.setIdleTaskExecutionLimit(100);
    

    https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/jms/listener/AbstractPollingMessageListenerContainer.html#setReceiveTimeout-long-

    https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/jms/listener/DefaultMessageListenerContainer.html#setIdleTaskExecutionLimit-int-

    public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit) 指定空闲执行的限制 消费者任务,在其执行过程中没有收到任何消息。 如果达到此限制,任务将关闭并离开接收 到其他执行任务。默认为1,关闭空闲资源 一旦任务没有收到消息,就提早。这适用于动态 仅调度;请参阅“maxConcurrentConsumers”设置。最小值 消费者数量(见“concurrentConsumers”)将保持在 直到关闭为止。

    在每个任务执行中,消息接收尝试的次数 (根据“maxMessagesPerTask”设置)每个都会等待一个 传入消息(根据“receiveTimeout”设置)。我摔倒 那些在给定任务中接收尝试返回而没有消息的人, 相对于接收到的消息,该任务被认为是空闲的。这样一个 任务仍可能重新安排;但是,一旦达到指定 “idleTaskExecutionLimit”,它将关闭(在动态的情况下 缩放)。

    如果您遇到过于频繁的放大和缩小问题,请提高此限制。 随着这个限制更高,闲置的消费者将被保留 更长的时间,避免在新加载消息后重新启动消费者 进来。或者,指定更高的“maxMessagesPerTask”和/或 “receiveTimeout”值,这也会导致空闲消费者被 保持更长的时间(同时也增加了平均 每个计划任务的执行时间)。

    http://activemq.apache.org/performance-tuning.html

    优化确认在自动确认模式下消费消息时 (在创建消费者会话时设置),ActiveMQ 可以确认 批量接收消息返回到代理(以改进 表现)。 批量大小是预取限制的 65% 消费者。此外,如果消息消耗很慢,将发送批处理 每 300 毫秒。 您可以通过设置打开批量确认 在 ActiveMQ ConnectionFactory 上 optimizeAcknowledge=true。

    http://activemq.apache.org/what-is-the-prefetch-limit-for.html

    一旦代理发送了预取限制数量的消息到 消费者 它不会再向该消费者发送任何消息 直到消费者确认至少 50% 的预取 它收到的消息,例如 prefetch/2。 当代理有 收到所述确认,它将发送进一步的 prefetch/2 给消费者“充值”的消息数量,可以说,它的 预取缓冲区。请注意,可以指定预取限制 以每位消费者为基础(见下文)。

    【讨论】:

      猜你喜欢
      • 2021-12-02
      • 2014-08-06
      • 2019-08-29
      • 2012-04-18
      • 2012-12-24
      • 1970-01-01
      • 1970-01-01
      • 2018-08-22
      • 2020-01-04
      相关资源
      最近更新 更多