【发布时间】:2020-06-22 05:11:23
【问题描述】:
我有一个用 Python 的 stomp.py 库编写的 ActiveMQ 消费者。这些是长期运行的消费者(例如 1 周)。我们已禁用心跳(0,0)。一段时间后(例如 20 小时),消费者一旦选择了一条消息就会被卡住。我们已经为消费者编写了重连机制,一旦连接失败就会重新连接,但消费者仍然会卡住。
逻辑:
- 尝试连接
- is_connected() #true or false
- 如果为假 -> 进入第 1 步
环境详情:
- Python 3.7
- stomp.py 4.1.22
- ActiveMQ 5.15.8 [使用活动/备用{master/slave}设置代理网络]
Python 代码:
import ssl
import stomp
LOG = custom_logger.get_logger("core")
class ActiveMQConsumer(ConsumerInterface):
def __init__(self, service_conf):
"""
Initialize the ActiveMQConsumer class that call create connection and adds listener.
Args:
service_conf : ActiveMQ configuration dict.
Returns:
None
Raises:
ConnectFailedException : Error while unable to connect to ActiveMQ
Exception : Base exception to catch all other exception
"""
try:
LOG.verbose("Entry")
self.username = service_conf["username"]
self.password = service_conf["password"]
self.hosts = service_conf["hosts"]
self.ports = service_conf["ports"]
self.callback = None
self.subscription_details = {}
self.heartbeats = service_conf.get("heartbeats", (0, 0))
self.heart_beat_receive_scale = service_conf.get("heart_beat_receive_scale", 2.0)
self.listener_obj = ConsumerListener(self)
self.create_connection()
except stomp.exception.ConnectFailedException as stomp_ex:
raise stomp_ex
except Exception as ex:
raise ex
def create_connection(self):
"""
create connection with activemq brokers
Args:
None
Returns:
None
Raises:
Exception : Base exception to catch all exception
"""
LOG.verbose("Entry")
exclude_hosts = []
for retry_count in range(MAX_CONNECTION_ATTEMPTS):
try:
selected_host, selected_port = select_host(self.hosts, self.ports,
exclude_hosts)
conn_param = zip(selected_host, selected_port)
self.conn = stomp.Connection11(conn_param, heartbeats=self.heartbeats,
encoding=ENCODE_FORMAT,
heart_beat_receive_scale=\
self.heart_beat_receive_scale,
reconnect_attempts_max=MAX_STOMP_RECONNECT_ATTEMPTS,
reconnect_sleep_increase=RECONNECT_SLEEP_INCREASE)
self.conn.set_ssl(for_hosts=list(zip(selected_host, selected_port)),
ssl_version=ssl.PROTOCOL_TLS)
self.conn.set_listener('connection_listener', self.listener_obj)
self.conn.start()
self.conn.connect(self.username, self.password, wait=True)
LOG.info("Connected to activemq host :: "+str(selected_host))
LOG.verbose("Exit")
break
except stomp.exception.ConnectFailedException as stomp_ex:
LOG.info("retry fail count::"+str(retry_count))
exclude_hosts.extend(selected_host)
if len(exclude_hosts) == len(self.hosts):
exclude_hosts = []
if (retry_count+1) == MAX_CONNECTION_ATTEMPTS:
LOG.verbose("Exit")
raise stomp_ex
def __subscribe(self):
"""
subscribes to queue in ActiveMQ broker
Args:
None
Returns:
None
Raises:
Exception : Base exception to catch all exception
"""
try:
LOG.verbose("Entry")
subscription_id = 0
for (destination, ack_mode) in self.subscription_details.items():
self.conn.subscribe(destination=destination, id=subscription_id,
ack=ACK_MODES[ack_mode],
headers={"activemq.prefetchSize":1}
)
subscription_id += 1
LOG.verbose("Exit")
except Exception as ex:
LOG.verbose("Exit")
raise ex
def consume(self, subscription_details, callback, **kwargs):
"""
call subscription for activemq and provides connection failure retry with sleep.
Args:
subscriber_details : list contains queue names with configuration(ack_mode).
callback : callback function to send messages.
Returns:
None
Raises:
Exception : Base exception to catch all exception
"""
try:
LOG.verbose("Entry")
self.callback = callback
for (destination, config) in subscription_details.items():
self.subscription_details[destination] = config.get("ack_mode", 2)
while True:
self.__subscribe()
while self.conn.is_connected():
if os.getenv("CONTAINER_STOP", "FALSE") == "TRUE":
raise ContainerStoppedError()
#this loop holds the main thread till activemq connection available
time.sleep(CONSUMER_SLEEP_TIME)
self.__reconnect()
except Exception as ex:
LOG.verbose("Exit")
raise ex
def __reconnect(self):
"""
closes the inactive connection and creates the new connection with activemq.
Args:
None
Returns:
None
Raises:
None
"""
LOG.verbose("Entry")
reconnect_counter = 0
while not self.conn.is_connected():
try:
self.close_connection()
time.sleep(CONSUMER_RETRY_WAIT_TIME)
self.create_connection()
LOG.info("Connected to ActiveMQ...")
except stomp.exception.StompException:
reconnect_counter += 1
LOG.info("reconnect failure count :: %s"%reconnect_counter)
LOG.verbose("Exit")
def close_connection(self):
"""
closes the connection.
Args:
None
Returns:
None
Raises:
None
"""
LOG.verbose("Entry")
self.conn.disconnect()
LOG.verbose("Exit")
【问题讨论】:
-
为什么要禁用心跳?这对于长期运行的客户尤其重要。
-
在没有正常负载时用于建立连接的心跳。但有时由于长时间处理消息而导致消息重复。如果是这样,我应该增加心跳超时@JustinBertram。需要您对此先生的想法。感谢您的回复。
-
为什么会特别重复消息?
-
嗨@JustinBertram,我们将15秒作为心跳间隔。在某些情况下,消息处理时间超过 40 秒。由于这个原因,这些消费者正在断开连接。现在我们将其更改为 60 秒。它的工作,谢谢
-
是否可以将此代码嵌入到 Flask 应用程序中并作为后台线程运行? Python新手在这里任何相同的链接都会有所帮助