【问题标题】:Python: Kombu+RabbitMQ Deadlock - queues are either blocked or blockingPython:Kombu+RabbitMQ 死锁 - 队列被阻塞或阻塞
【发布时间】:2023-08-31 03:41:01
【问题描述】:

问题

我有一个RabbitMQ Server,用作我的一个系统的队列中心。在过去一周左右的时间里,它的生产者每隔几个小时就会完全停止。

我尝试了什么

蛮力

  • 停止消费者会释放锁定几分钟,然后阻塞返回。
  • 重启 RabbitMQ 解决了几个小时的问题。
  • 我有一些自动脚本可以执行丑陋的重启,但它显然远非正确的解决方案。

分配更多内存

cantSleepNow's answer之后,我增加了memory allocated to RabbitMQ to 90%。服务器拥有高达 16GB 的内存,并且消息数量不是很高(每天数百万),所以这似乎不是问题。

从命令行:

sudo rabbitmqctl set_vm_memory_high_watermark 0.9

还有/etc/rabbitmq/rabbitmq.config:

[
   {rabbit,
   [
     {loopback_users, []},
     {vm_memory_high_watermark, 0.9}
   ]
   }
].

代码与设计

我为所有消费者和生产者使用 Python。

生产者

生产者是提供调用的 API 服务器。每当有呼叫到达时,就会打开连接,发送消息并关闭连接。

from kombu import Connection

def send_message_to_queue(host, port, queue_name, message):
    """Sends a single message to the queue."""
    with Connection('amqp://guest:guest@%s:%s//' % (host, port)) as conn:
        simple_queue = conn.SimpleQueue(name=queue_name, no_ack=True)
        simple_queue.put(message)
        simple_queue.close()

消费者

消费者之间略有不同,但通常使用以下模式 - 打开连接,并等待消息到达。连接可以长时间保持打开状态(例如,几天)。

with Connection('amqp://whatever:whatever@whatever:whatever//') as conn:
    while True:
        queue = conn.SimpleQueue(queue_name)
        message = queue.get(block=True)
        message.ack()

设计推理

  • 消费者始终需要与队列服务器保持开放连接
  • 生产者会话应仅在 API 调用的生命周期内有效

这种设计直到大约一周前还没有引起任何问题。

网页视图仪表板

Web 控制台显示 127.0.0.1172.31.38.50 中的消费者阻止 172.31.38.50172.31.39.120172.31.41.38172.31.41.38 中的消费者。

系统指标

为了安全起见,我检查了服务器负载。正如预期的那样,平均负载和 CPU 利用率指标都很低。

为什么兔子的MQ会各自出现这样的死锁?

【问题讨论】:

  • 我没有理解这部分the consumers reuse the same connection while waiting on new messages。生产者如何恢复连接?连接是否只能由生产者或(或真的)消费者在同一时刻使用?
  • @cantSleepNow 谢谢!在这里和问题-1中澄清。每个消费者打开一个连接,然后从中获取消息。对于消费者的整个生命周期——可能是几天——它使用相同的连接。 2. 每个生产者等待 API 调用。当一个新的呼叫到达时,它会打开一个连接,向它写入数据,然后立即关闭它。
  • 好的,所以没有连接共享,每个消费者和生产者都是独立的进程?你能以某种方式确定生产者关闭连接时会发生什么吗?另外,您是在使用阻塞连接还是选择(请参阅此问题以供参考*.com/questions/11987838/…)编辑对不起,由于某种原因我不能做@回复标签...
  • @AdamMatan 你能发布日志吗?当您的连接被阻塞/阻塞时,很可能是针对某些 RabbitMQ 警报。你用的是哪个版本?
  • 我今天早上看到了这个博客blog.domanski.me/rapid-rabbitmq。笔者也有类似的情况。他的结论是管理插件启用了 RabbitMQ 内部的一些代码,导致流控制表现不佳,队列中的所有内容最终都被阻塞。在他的情况下,禁用管理插件修复了它。

标签: python rabbitmq deadlock blockingqueue kombu


【解决方案1】:

这很可能是由 RabbitMQ 3.6.2 的管理模块中的内存泄漏引起的。此问题现已在 RabbitMQ 3.6.3 中得到修复,可通过 here 获取。

问题本身在here 中进行了描述,但也在 RabbitMQ 消息板上进行了广泛讨论;例如herehere。众所周知,这会导致很多奇怪的问题,一个很好的例子是报告的问题here

作为新版本发布之前的临时修复,您可以升级到新的 est 版本、降级到 3.6.1 或完全禁用管理模块。

【讨论】:

  • 我刚刚降级到 3.6.1(3.6.3 在 Ubuntu 14.04 LTS 上存在依赖问题),迫不及待想看看它是否有效。
  • 如果您还有其他问题,请告诉我。
  • 当然。你的家庭住址。我的妻子会送你鲜花,我的老板会送你披萨和啤酒,我会送你一瓶好酒。我有两晚不间断的睡眠(好吧,除了孩子们)。进展顺利。
  • 顺便说一句,我不敢相信他们网站上的RabbitMQ still features 3.6.2。网络插件大家都用,而且有毒。
  • @AdamMatan:降级后这仍然有效吗?顺便3.6.3正式发布!
【解决方案2】:

我写这篇文章是为了回答,部分是因为它可能会有所帮助,部分是因为它太大而不能成为评论。

首先我很抱歉错过了这个message = queue.get(block=True)。也是免责声明 - 我不熟悉 python 和 PIKA API。

AMQP's basic.get is actually synchronous 并且您正在设置block=true。正如我所说,不知道这在 PIKA 中意味着什么,但结合不断合并队列,听起来效率不高。因此,无论出于何种原因,发布者都可能由于队列访问被消费者阻止而拒绝连接。它实际上非常适合您通过Stopping the consumers releases the lock for a few minutes, but then blocking returns. 临时解决问题的方式

我建议尝试使用 AMQP 的 basic.consume 而不是 basic.get。我不知道获取的动机是什么,但在大多数情况下(无论如何我的经验)你应该使用消费。仅引用上述链接

此方法提供对队列中消息的直接访问,使用 一种同步对话,专为特定类型的 同步功能比更重要的应用程序 性能。

RabbitMQ docs 中,它说当代理资源不足时连接被阻塞,但正如您所写的那样,负载非常低。为了安全起见,您可以检查内存消耗和可用磁盘空间。

【讨论】:

  • 非常感谢您的回答! consume() 似乎部分支持,而 get() 完全支持;我在SimpleQueue 中看不到任何consume() 方法。但是,当队列中没有消息时,消费者阻塞队列是有意义的。阻塞适用于阅读,而不是写作,结束 - 除非我在这里遗漏了什么。
  • @AdamMatan 欢迎您。我不知道你需要从这些方法中得到什么,但是在教程中使用了消耗,所以我假设它可以工作 rabbitmq.com/tutorials/tutorial-one-python.html 无论如何,请告诉我(我相信其他人也有兴趣)它是如何工作的事实证明。
  • 我增加了内存(更新了“我尝试了什么”部分)。服务器每天都在失败。