【问题标题】:RabbitMQ: throttling fast producer against large queues with slow consumerRabbitMQ:针对具有慢消费者的大型队列限制快速生产者
【发布时间】:2015-01-20 09:37:23
【问题描述】:

我们目前正在使用 RabbitMQ,其中一个持续超快的生产者与一个受限于有限资源(例如缓慢的 MySQL 插入)的消费者配对。

我们不喜欢使用x-max-length 声明队列,因为一旦达到限制,所有消息都将被丢弃或死信,而且我们不想丢失消息。

添加更多消费者很容易,但它们都会受到一个共享资源的限制,因此这是行不通的。问题依然存在:如何减慢生产者的速度?

当然,我们可以在 Redis、memcached、MySQL 或生产者读取为 pointed out in an answer to a similar question 的其他东西中放置一个流控制标志,或者更好的是,生产者可以定期测试队列长度并限制自身,但这些看起来像黑客攻击我。

我主要是在质疑我是否存在根本性的误解。我原以为这是一种常见的情况,所以我想知道:

限制生产者的最佳做法是什么? RabbitMQ 如何做到这一点?还是您以完全不同的方式做到这一点?

背景

假设制作人实际上知道如何通过正确的输入减慢自己的速度。例如。硬件传感器或硬件随机数生成器,可以根据需要生成任意数量的事件。

在我们特定的真实案例中,我们有一个 API,用户可以使用它来添加消息。我们不想吞噬和丢弃消息,而是希望通过让我们的 API 在队列“已满”时返回错误来施加背压,以便调用者/用户知道退避,或者让 API 阻塞直到消费者赶上。我们不控制用户,所以不管消费者有多快,我都可以创建一个更快的生产者。

我希望有类似 TCP 套接字的 API,write() 可以阻塞,select() 可以用来确定句柄是否可写。因此,要么让 RabbitMQ API 阻塞,要么让它在队列已满时返回错误。

【问题讨论】:

  • 只是出于好奇,您有没有解决过这个问题?我们想实现一个类似的功能,如果消费者过载,我们希望生产者采取一些行动。
  • 不,对不起。目前还没有解决办法。
  • 谢谢彼得!如果我们找到任何解决方案,我会通知您。

标签: rabbitmq throttling


【解决方案1】:

对于 x-max-length 属性,您说您不希望消息被丢弃或死信。我看到有一个更新,为此添加了更多功能。正如我所见,它在文档中指定:

"使用溢出设置来配置队列溢出行为。如果溢出设置为reject-publish,则最近发布的消息将被丢弃。此外,如果启用了发布者确认,则发布者将被通知拒绝通过basic.nack 消息”

据我了解,您可以使用队列限制来拒绝来自发布者的新消息,从而向上游施加一些背压。

【讨论】:

    【解决方案2】:

    我不认为这是 rabbitmq 特有的。基本上,您有一个场景,其中有两个具有不同处理能力的系统,这种不匹配将造成队列溢出的风险(无论它是什么),或者即使在生产者和消费者之间不断不匹配的情况下,只需创建事件创建和处理之间的时间距离越来越长。

    我曾经处理过这种场景,不幸的是没有灵丹妙药。您要么必须加快处理速度(更好的硬件,更合适的软件?),要么限制事件创建(这与 MQ 无关)。

    现在,我想问你目标是什么以及事件是如何产生的。事件是不断产生的,要么是无限的,要么是非常高的(例如来自传感器的读数 - 越多越好),或者它们是成批/尖峰创建的(例如:特定时间段内的用户请求、批量加载来自 CRM 系统)。我认为目标是处理所有内容,因为您提到您不想丢失任何排队的消息。

    如果输出是恒定的,那么某些限制器(内部计数器,如果生产者是唯一的生产者,或者外部队列长度检查队列是否可以被其他系统填充)肯定就位。

    IF eventsInTimePeriod/timePeriod > estimatedConsumerBandwidth
    THEN LowerRate()
    ELSE RiseRate()
    

    在现实世界的场景中,我们过去常常简单地将输出手动限制为估计值,并且为队列长度、从队列进入到队列离开的时间等设置了一些警报。在这些限制器被省略的地方(大部分是错误的)我们使用稍后找到一些本应在几个小时内完成的任务,而这些任务却要等待三个月才能轮到他们。

    恐怕很难回答“如何减慢制作人的速度?”如果我们对此一无所知,但一些想法是:上述速率检查或阻塞 AddMessage 方法:

    AddMessage(message)
        WHILE(getQueueLength() > maxAllowedQueueLength)
            spin(1000); // or sleep or whatever
        mqAdapter.AddMessage(message)
    

    我想说这完全取决于生产者应用程序的具体情况以及您的架构。

    【讨论】:

    • 感谢您的周到回复。也许我的问题不清楚。我已经相应地澄清了这个问题。
    • 不幸的是,我无法帮助处理 RabbitMQ 的具体细节,从未使用过该特定技术 - 也许其他人会有一些很好的见解。我有一种感觉,看到您的实际事件来源是多个用户的情况,您的情况相当复杂。例如,简单的阻塞直到队列降到限制以下会很危险,因为一些客户端可能不走运并且永远保持锁定状态(因为一旦长度下降,其他客户端就会将其推回到限制之上)。当队列超过一定长度时可能只返回一个错误,但同样......
    • ...但同样需要考虑一些陷阱(比如一些客户从未遇到过错误,而另一些客户则经常遇到错误,这完全是偶然的机会)。
    猜你喜欢
    • 1970-01-01
    • 2016-08-27
    • 2012-12-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-10-10
    • 2011-10-21
    • 2012-01-28
    相关资源
    最近更新 更多