【问题标题】:Pika/RabbitMQ: Correct usage of add_backpressure_callbackPika/RabbitMQ:正确使用 add_backpressure_callback
【发布时间】:2012-08-08 01:34:03
【问题描述】:

我是使用 RabbitMQ 和 Pika 的新手,所以如果答案很明显,请原谅...

我们正在输入一些数据并将结果传递到我们的 rabbitmq 消息队列中。该队列正被将数据写入 elasticsearch 的进程使用。

数据的生成速度超过了将其输入弹性搜索的速度,因此队列会增长并且几乎不会缩小。

我们正在使用 pika 并收到警告:

UserWarning: Pika: Write buffer exceeded warning threshold at X bytes and an estimated X frames behind.

这会持续一段时间,直到 Pika 简单地崩溃并显示一条奇怪的错误消息:

NameError: global name 'log' is not defined

我们正在使用 Pika BlockingConnection 对象 (http://pika.github.com/connecting.html#blockingconnection)。

我解决此问题的计划是使用 add_backpressure_callback 函数来创建一个函数,该函数将在每次我们需要应用背压时调用 time.sleep(0.5)。然而,这似乎是一个过于简单的解决方案,必须有更合适的方式来处理这样的事情。

我猜想队列的填充速度快于消耗速度是一种常见情况。我正在寻找一个例子,甚至是一些关于什么是减缓队列速度的最佳方法的建议。

谢谢!

【问题讨论】:

    标签: rabbitmq pika


    【解决方案1】:

    有趣的问题,正如您正确指出的那样,这可能很常见。我在 Stack Overflow 上看到了另一个相关问题,其中包含一些提示

    Pika: Write buffer exceeded warning

    此外,您可能想考虑扩大您的弹性搜索,这可能是您想要解决的根本瓶颈。快速浏览一下 elasticsearch.org 网站,我们想到了

    "分布式

    Elastic Search 的主要特点之一是其分布式特性。索引被分解成碎片,每个碎片都有 0 个或更多副本。集群中的每个数据节点都托管一个或多个分片,并充当协调者将操作委托给正确的分片。重新平衡和路由是在幕后自动完成的。 "

    (......虽然不确定插入是否也是分布式和可扩展的)

    毕竟,RabbitMQ 不应该无限增长队列。还可能希望查看扩展 RabbitMQ 本身,例如通过在 RabbitMQ 配置中使用每个队列进程等。

    干杯!

    【讨论】:

    • 感谢您的提示。我最终将 RabbitMQ 排除在外,现在一切顺利。我意识到,如果我有多台机器可以扩展任何东西,那么 RabbitMQ 将是提供数据的好方法,但是因为我正在运行弹性搜索的同一台机器上处理数据,所以它没有中间有一个消息代理是没有意义的。在这种情况下,它仅用作额外的故障点。不过,感谢您的提示,我们最终是否应该扩展我们的系统。
    猜你喜欢
    • 1970-01-01
    • 2014-05-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多