【问题标题】:RabbitMQ broken pipe error or lost messagesRabbitMQ 损坏管道错误或丢失消息
【发布时间】:2017-07-12 17:59:52
【问题描述】:

使用pika库的BlockingConnection连接RabbitMQ,发布消息时偶尔会报错:

致命的套接字错误:error(32, 'Broken pipe')

这是来自一个非常简单的子进程,它从内存队列中取出一些信息,并将一个小的 JSON 消息发送到 AMQP。该错误似乎仅在系统几分钟未发送任何消息时出现。

设置:

connection = pika.BlockingConnection(parameters)
channel = self.connection.channel()
channel.exchange_declare(
    exchange='xyz',
    exchange_type='fanout',
    passive=False,
    durable=True,
    auto_delete=False
)

排队代码捕获任何连接错误并重试:

def _enqueue(self, message_id, data):
    try:
        published = self.channel.basic_publish(
            self.amqp_exchange,
            self.amqp_routing_key,
            json.dumps(data),
            pika.BasicProperties(
                content_type="application/json",
                delivery_mode=2,
                message_id=message_id
            )
        )

        # Confirm delivery or retry
        if published:
            self.retry_count = 0
        else:
            raise EnqueueException("Message publish not confirmed.")

    except (EnqueueException, pika.exceptions.AMQPChannelError, pika.exceptions.AMQPConnectionError,
            pika.exceptions.ChannelClosed, pika.exceptions.ConnectionClosed, pika.exceptions.UnexpectedFrameError,
            pika.exceptions.UnroutableError, socket.timeout) as e:
        self.retry_count += 1
        if self.retry_count < 5:
            logging.warning("Reconnecting and resending")
            if self.connection.is_open:
                self.connection.close()
            self.connect()
            self._enqueue(message_id, data)
        else:
            raise e

这有时适用于第二次尝试。它通常会挂起一段时间,或者只是在最终抛出异常之前丢弃消息(possibly related bug report)。因为它只在系统安静几分钟时才会发生,我猜这是由于连接超时。但 AMQP 有一个心跳系统,据报道 pika 使用它 (related bug report)。

为什么我会收到此错误或丢失消息,以及为什么在不使用时连接不会保持打开状态?

【问题讨论】:

    标签: python rabbitmq amqp pika


    【解决方案1】:

    来自另一个bug report

    由于 BlockingConnection 不在后台处理心跳,并且 heartbeat_interval 不能覆盖服务器建议的心跳间隔(这也是一个错误),我建议默认情况下应该禁用心跳(依赖 TCP keep-alive 代替) .

    如果在消费块中处理任务所需的时间比服务器建议的心跳间隔时间长,则连接将被服务器关闭,客户端在处理完成后将无法确认消息。

    v1.0.0 中的 update 可能有助于解决此问题。

    所以我实施了一个解决方法。每 30 秒我通过队列发布一条心跳消息。这使连接保持打开状态,并具有向客户端确认我的应用程序已启动并正在运行的额外好处。

    【讨论】:

      【解决方案2】:

      Broken Pipe 错误意味着当客户端关闭连接时,服务器正在尝试将某些内容写入套接字。

      如我所见,您有一些共享的“self.connection”可能在并行线程之前/并行线程中关闭?

      您还可以将日志记录级别设置为 DEBUG 并查看客户端的日志以确定客户端关闭连接的时刻。

      【讨论】:

      • 所以这意味着连接在我这边而不是 Rabbit 那边关闭? self.connection 是在其自己的进程中运行的对象的一部分,因此连接不与任何其他线程或进程共享。