【问题标题】:Pika heartbeat terminates connectionPika 心跳终止连接
【发布时间】:2021-03-18 10:50:39
【问题描述】:

我有一个代码,它只是用 pika 将消息排入代理队列。

class Publisher:

    def __init__(self, config):
        self._params = ConnectionParameters(
            host = config.RABBITMQ_HOST,
            credentials = PlainCredentials(config.RABBITMQ_USER, config.RABBITMQ_PASSWORD))
        self._conn = None
        self._channel = None
        self.exchange_name = config.RABBITMQ_AGENT_EXCHANGE


    def connect(self):
        if not self._conn or self._conn.is_closed:
            self._conn = BlockingConnection(self._params)
            self._channel = self._conn.channel()
            self._channel.exchange_declare(exchange=self.exchange_name,  exchange_type = 'topic')

    def _publish(self, task):
        properties = BasicProperties(expiration=task.expiration_ms)
        self._channel.basic_publish(exchange= self.exchange_name,
                                    routing_key = task.routing_key,
                                    properties = properties if task.has_expiration else None,
                                    body=dumps(task, cls = TaskEncoder).encode())
        logging.debug('message sent: %s', task)


    def publish(self, msg):
        """Publish msg, reconnecting if necessary."""

        try:
            self._publish(msg)
        except ConnectionClosed:
            logging.error('reconnecting to queue')
            self.connect()
            self._publish(msg)

Pika 停止将消息排入队列以与下一条消息进行长时间连接,并且不再抛出任何错误

2021-03-14 12:25:09,981 MainThread-140100212655936 pika.heartbeat [INFO] - Connection is idle, 1 stale byte intervals
2021-03-14 12:25:09,981 MainThread-140100212655936 pika.adapters.utils.io_services_utils [INFO] - Aborting transport connection: state=1; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('192.168.184.108', 41024), raddr=('10.100.176.158', 5672)>
2021-03-14 12:25:09,981 MainThread-140100212655936 pika.adapters.utils.io_services_utils [INFO] - _AsyncTransportBase._initate_abort(): Initiating abrupt asynchronous transport shutdown: state=1; error=None; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('192.168.184.108', 41024), raddr=('10.100.176.158', 5672)

代码用法

publisher = Publisher(config)
publisher.connect()
while True:
  publisher.publish(obj)
  time.sleep(1)

我有两个问题:

如何预防?在这种情况下是否可以禁用心跳?

如何使用防火墙重现/模拟这种行为?我尝试在 RMQ 端口上添加丢包规则,但没有成功。

鼠兔版本:1.0.1

RMQ 版本:3.8.9

Python:3.8.6

【问题讨论】:

    标签: python rabbitmq pika


    【解决方案1】:

    有很多帖子建议保持回调执行时间很短,所以控制应该回到 pika。 see more on it here

    如果您有大事要计算,最好将它们跨越到另一个线程/进程,或者将它们累积起来以供以后执行。 这个概念对我使用其他实时 API 有所帮助。

    似乎发生了什么,兔子正在关闭频道(如果你的手不在其他地方忙,你可能会在回调中捕捉到它),并且在我看来心跳被 pika 停止了。

    • 链接

    【讨论】:

      猜你喜欢
      • 2015-12-04
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多