【问题标题】:RabbitMQ exchange queue max lengthRabbitMQ 交换队列最大长度
【发布时间】:2019-02-06 10:50:06
【问题描述】:

我正在尝试在基于 RabbitMQ 的应用程序中应用一些流控制。

我的系统的一个非常狭窄的简介:

  • 有一些蓝工扫描并输入目录并将消息发布到交换。
  • 还有其他 red-workers 从这个交换中消费(基于路由键)并对数据执行任何操作而不是删除它。

“存储”在交换中的数据非常大,工作人员检查它所花费的时间是显而易见的。一段时间后,我收到来自 RabbitMQ 的内存警告,指出内存使用率过高,所有发布操作都已停止。

我试图增加 Rabbitmq 正在使用的内存量,但它只是将问题推迟了几个小时(运行时)。我还使队列基于磁盘而不是基于内存,但我的磁盘已满。

由于我的输入不是那么大,我可以忍受一个“大”输入队列,蓝色工人从中读取他们的输入。所以我想尝试在蓝工和交易所之间的链接上设置一些“最大长度”。我相信我不会在这里丢失任何东西,因为我系统的真正瓶颈是红色工人(顺便说一下,我用prefech_count=2 声明了红色工人和交易所之间的链接)。

说了这么多之后...我没有设法应用这样的最大长度:( 我正在使用Pika 来声明我的队列并使用频道。 我阅读了这篇文章 (https://www.rabbitmq.com/maxlength.html),但没有设法在我的代码中实现它,我希望看到一个使用这个 max-size 标志的示例。

【问题讨论】:

    标签: rabbitmq rabbitmq-exchange


    【解决方案1】:

    Exchange 不存储消息:队列可以。

    如果您设置队列的最大长度,在queue_declare() 调用中设置相应的arguments,不适合它的消息将被丢弃。

    但是,发布者可以在不注意的情况下将消息发布到交易所。

    生产者可以使用dead-lettering 收到有关它们失败的通知。

    如果您需要代码方面的帮助,请将代码包含在问题中。

    【讨论】:

    • 哦,我明白了。我现在设法通过使用“x-max-length”参数来做到这一点。但是我不能丢失消息,所以我也尝试添加{'x-overflow': 'reject-publish'} 参数,但channel.basic_publish 的返回值始终为True,即使我可以看到消息不会继续完整的管道。你知道为什么吗?
    • 您是否启用了publisher confirms?从有关overflow 的文档中,该配置仅向rabbitmq 指示要丢弃哪些消息,新消息而不是旧消息。如果您获得了 basic.nack,则发布者将收到消息尚未到达其预期目的地(队列)的通知。根据您的描述,我假设您已经在发布者方面实施了某种等待,以便在延迟后再次尝试?
    • @AndyThomas 因为如果交换可达,消息总是会成功发布。如果您的交换没有绑定到它的队列,则消息会丢失。如果队列已满,则消息会丢失。如果你不想丢失消息,你需要使用死信,或者从消费者到生产者的某种反馈,让生产者减速/等待。但这太抽象了 - 添加一个小的工作示例让我们帮助您。
    • @Sigismondo 谢谢!我想我确实会像你说的那样实现一些反馈机制。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2022-01-05
    • 2014-05-21
    • 2022-10-18
    • 2013-07-25
    • 2020-04-30
    • 2017-08-21
    • 2012-07-03
    相关资源
    最近更新 更多