【问题标题】:ActiveMQ long running python consumer processes stuckActiveMQ长时间运行的python消费者进程卡住了
【发布时间】:2020-06-22 05:11:23
【问题描述】:

我有一个用 Python 的 stomp.py 库编写的 ActiveMQ 消费者。这些是长期运行的消费者(例如 1 周)。我们已禁用心跳(0,0)。一段时间后(例如 20 小时),消费者一旦选择了一条消息就会被卡住。我们已经为消费者编写了重连机制,一旦连接失败就会重新连接,但消费者仍然会卡住。

逻辑:

  1. 尝试连接
  2. is_connected() #true or false
  3. 如果为假 -> 进入第 1 步

环境详情:

  1. Python 3.7
  2. stomp.py 4.1.22
  3. ActiveMQ 5.15.8 [使用活动/备用{master/slave}设置代理网络]

Python 代码:

import ssl
import stomp
LOG = custom_logger.get_logger("core")

class ActiveMQConsumer(ConsumerInterface):

    def __init__(self, service_conf):
        """
        Initialize the ActiveMQConsumer class that call create connection and adds listener.
        Args:
            service_conf : ActiveMQ configuration dict.
        Returns:
            None
        Raises:
            ConnectFailedException : Error while unable to connect to ActiveMQ
            Exception : Base exception to catch all other exception
        """
        try:
            LOG.verbose("Entry")
            self.username = service_conf["username"]
            self.password = service_conf["password"]
            self.hosts = service_conf["hosts"]
            self.ports = service_conf["ports"]
            self.callback = None
            self.subscription_details = {}
            self.heartbeats = service_conf.get("heartbeats", (0, 0))
            self.heart_beat_receive_scale = service_conf.get("heart_beat_receive_scale", 2.0)
            self.listener_obj = ConsumerListener(self)
            self.create_connection()

        except stomp.exception.ConnectFailedException as stomp_ex:
            raise stomp_ex
        except Exception as ex:
            raise ex

    def create_connection(self):
        """
        create connection with activemq brokers
        Args:
            None
        Returns:
            None
        Raises:
            Exception : Base exception to catch all exception
        """
        LOG.verbose("Entry")
        exclude_hosts = []
        for retry_count in range(MAX_CONNECTION_ATTEMPTS):
            try:
                selected_host, selected_port = select_host(self.hosts, self.ports,
                                                           exclude_hosts)
                conn_param = zip(selected_host, selected_port)
                self.conn = stomp.Connection11(conn_param, heartbeats=self.heartbeats,
                                               encoding=ENCODE_FORMAT,
                                               heart_beat_receive_scale=\
                                               self.heart_beat_receive_scale,
                                               reconnect_attempts_max=MAX_STOMP_RECONNECT_ATTEMPTS,
                                               reconnect_sleep_increase=RECONNECT_SLEEP_INCREASE)
                self.conn.set_ssl(for_hosts=list(zip(selected_host, selected_port)),
                                  ssl_version=ssl.PROTOCOL_TLS)
                self.conn.set_listener('connection_listener', self.listener_obj)
                self.conn.start()
                self.conn.connect(self.username, self.password, wait=True)
                LOG.info("Connected to activemq host :: "+str(selected_host))
                LOG.verbose("Exit")
                break
            except stomp.exception.ConnectFailedException as stomp_ex:
                LOG.info("retry fail count::"+str(retry_count))
                exclude_hosts.extend(selected_host)
                if len(exclude_hosts) == len(self.hosts):
                    exclude_hosts = []
                if (retry_count+1) == MAX_CONNECTION_ATTEMPTS:
                    LOG.verbose("Exit")
                    raise stomp_ex

    def __subscribe(self):
        """
        subscribes to queue in ActiveMQ broker
        Args:
            None
        Returns:
            None
        Raises:
            Exception : Base exception to catch all exception
        """
        try:
            LOG.verbose("Entry")
            subscription_id = 0
            for (destination, ack_mode) in self.subscription_details.items():
                self.conn.subscribe(destination=destination, id=subscription_id,
                                    ack=ACK_MODES[ack_mode],
                                    headers={"activemq.prefetchSize":1}
                                    )
                subscription_id += 1
            LOG.verbose("Exit")
        except Exception as ex:
            LOG.verbose("Exit")
            raise ex

    def consume(self, subscription_details, callback, **kwargs):
        """
        call subscription for activemq and provides connection failure retry with sleep.
        Args:
            subscriber_details : list contains queue names  with configuration(ack_mode).
            callback : callback function to send messages.
        Returns:
            None
        Raises:
            Exception : Base exception to catch all exception
        """
        try:
            LOG.verbose("Entry")
            self.callback = callback
            for (destination, config) in subscription_details.items():
                self.subscription_details[destination] = config.get("ack_mode", 2)
            while True:
                self.__subscribe()
                while self.conn.is_connected():
                    if os.getenv("CONTAINER_STOP", "FALSE") == "TRUE":
                        raise ContainerStoppedError()
                    #this loop holds the main thread till activemq connection available
                    time.sleep(CONSUMER_SLEEP_TIME)
                self.__reconnect()
        except Exception as ex:
            LOG.verbose("Exit")
            raise ex

    def __reconnect(self):
        """
        closes the inactive connection and creates the new connection with activemq.
        Args:
            None
        Returns:
            None
        Raises:
            None
        """
        LOG.verbose("Entry")
        reconnect_counter = 0
        while not self.conn.is_connected():
            try:
                self.close_connection()
                time.sleep(CONSUMER_RETRY_WAIT_TIME)
                self.create_connection()
                LOG.info("Connected to ActiveMQ...")
            except stomp.exception.StompException:
                reconnect_counter += 1
                LOG.info("reconnect failure count :: %s"%reconnect_counter)
        LOG.verbose("Exit")

    def close_connection(self):
        """
        closes the connection.
        Args:
            None
        Returns:
            None
        Raises:
            None
        """
        LOG.verbose("Entry")
        self.conn.disconnect()
        LOG.verbose("Exit")

【问题讨论】:

  • 为什么要禁用心跳?这对于长期运行的客户尤其重要。
  • 在没有正常负载时用于建立连接的心跳。但有时由于长时间处理消息而导致消息重复。如果是这样,我应该增加心跳超时@JustinBertram。需要您对此先生的想法。感谢您的回复。
  • 为什么会特别重复消息?
  • 嗨@JustinBertram,我们将15秒作为心跳间隔。在某些情况下,消息处理时间超过 40 秒。由于这个原因,这些消费者正在断开连接。现在我们将其更改为 60 秒。它的工作,谢谢
  • 是否可以将此代码嵌入到 Flask 应用程序中并作为后台线程运行? Python新手在这里任何相同的链接都会有所帮助

标签: python activemq stomp


【解决方案1】:

我认为您应该禁用心跳,并添加 on_disconnected 以在您的 linsener 上重新连接!

【讨论】:

  • 我们不应该禁用心跳,因为这些是长期运行的消费者。心跳用于保持连接活跃。
  • 我们使用connection.is_connected()方法检查连接状态,连接丢失时返回false。这就是我们检测丢失连接并重新连接到 ActiveMQ 的方式。请让我知道我们必须在侦听器中包含on_disconnect,它会提供is_connected() 方法中未处理的任何额外检查吗?
猜你喜欢
  • 1970-01-01
  • 2010-10-19
  • 2018-05-05
  • 2015-01-30
  • 1970-01-01
  • 1970-01-01
  • 2016-06-08
  • 2012-08-04
  • 1970-01-01
相关资源
最近更新 更多