【发布时间】:2017-07-12 17:59:52
【问题描述】:
使用pika库的BlockingConnection连接RabbitMQ,发布消息时偶尔会报错:
致命的套接字错误:error(32, 'Broken pipe')
这是来自一个非常简单的子进程,它从内存队列中取出一些信息,并将一个小的 JSON 消息发送到 AMQP。该错误似乎仅在系统几分钟未发送任何消息时出现。
设置:
connection = pika.BlockingConnection(parameters)
channel = self.connection.channel()
channel.exchange_declare(
exchange='xyz',
exchange_type='fanout',
passive=False,
durable=True,
auto_delete=False
)
排队代码捕获任何连接错误并重试:
def _enqueue(self, message_id, data):
try:
published = self.channel.basic_publish(
self.amqp_exchange,
self.amqp_routing_key,
json.dumps(data),
pika.BasicProperties(
content_type="application/json",
delivery_mode=2,
message_id=message_id
)
)
# Confirm delivery or retry
if published:
self.retry_count = 0
else:
raise EnqueueException("Message publish not confirmed.")
except (EnqueueException, pika.exceptions.AMQPChannelError, pika.exceptions.AMQPConnectionError,
pika.exceptions.ChannelClosed, pika.exceptions.ConnectionClosed, pika.exceptions.UnexpectedFrameError,
pika.exceptions.UnroutableError, socket.timeout) as e:
self.retry_count += 1
if self.retry_count < 5:
logging.warning("Reconnecting and resending")
if self.connection.is_open:
self.connection.close()
self.connect()
self._enqueue(message_id, data)
else:
raise e
这有时适用于第二次尝试。它通常会挂起一段时间,或者只是在最终抛出异常之前丢弃消息(possibly related bug report)。因为它只在系统安静几分钟时才会发生,我猜这是由于连接超时。但 AMQP 有一个心跳系统,据报道 pika 使用它 (related bug report)。
为什么我会收到此错误或丢失消息,以及为什么在不使用时连接不会保持打开状态?
【问题讨论】: