【问题标题】:Throttling an AMQP Consumer Using RabbitMQ使用 RabbitMQ 限制 AMQP 消费者
【发布时间】:2016-02-08 18:23:21
【问题描述】:

我在可靠性模式中使用 AMQP,我的用例是将消息放入队列中,然后使用它们并将信息插入到 Web 服务中。我的网络服务很慢,我的队列可能有很多很多的消息,我想确保消费者不会杀死我的数据库。

在 RabbitMQ 中是否有内置的方式来执行限制,无论是基于时间的(每分钟/秒/小时只有 X 条消息)还是其他一些机制?

【问题讨论】:

标签: java rabbitmq amqp


【解决方案1】:

per-connection 流控制,所以如果服务器上有太多消息,发布者将等待。 RabbitMQ 是一个非常可靠的系统,我可以说你不用担心它。

如果你在谈论如何限制消费,可能你必须自己照顾它。您还可以查看 channel.flow(自 RabbitMQ 3.3.0 起已弃用)和 basic.qos 方法,或者您甚至可以暂时断开消费者并在您的服务能够接受时重新连接它们负载。

UPD 我可以建议您使用 basic.consume 使用消息并将其提供给您的 Web 服务。根据您的 Web 服务处理负载多长时间,您可能会猜测它的负载并执行某种sleep(N)。当您的消费者在睡觉时,它不会消耗任何东西,因此不会提供任何网络服务。

【讨论】:

  • 我并不真正担心代理备份,我更担心如果我的 AMQP 连接器一次提取所有消息,我的 Web 服务会因连接过多而过载。
  • 您使用哪种语言编写应用程序?
  • Java。我希望不必为此编写自己的逻辑,并且 RabbitMQ/AMQP 可能内置了一些东西。
  • 从代理获取消息有两种主要方法:basic.get 获取一条消息(如果有),basic.consume 持续消费消息,只要它们在队列中可用。根据此信息和我上面的更新,您可以根据您的网络服务可用性 basic.get 消息。请注意,如果此队列没有消息,则 basic.get 可能不会返回任何消息。这种方法不应该存在特定于平台的问题。
【解决方案2】:

我想知道“Per-Connection Flow Control”是否与channel.flow()有关。

基本上你可以拨打channel.flow(false);通知代理停止发送消息。
调用channel.flow(true); 使流再次激活。这是javadoc

【讨论】: