【问题标题】:RabbitMQ - purge a queue from all of its unacked messagesRabbitMQ - 从所有未确认的消息中清除队列
【发布时间】:2023-04-19 19:31:01
【问题描述】:

我的开发环境中有数千条未确认的消息,我无法重新启动。
有没有办法删除(清除)所有未确认的消息?

【问题讨论】:

    标签: rabbitmq


    【解决方案1】:

    关闭未确认消息所在的通道,这会将它们返回到队列中,然后调用清除。

    【讨论】:

    • 如何找到未确认消息所在的频道?
    • @grayaii 在你的 rabbitmq 控制台中..如果你点击未确认消息的队列,里面会有一个消费者部分,你会找到与之相关的频道。
    • 谢谢!这对我有用:我停止了队列的消费者,然后消息已移入存储桶,然后我可以使用“清除”按钮将其删除。
    【解决方案2】:

    你必须让消费者ack他们(或nack),然后他们才会被删除。或者,您可以关闭消费者并完全清除队列。

    如果您正在寻找某种方法来清除所有未确认的消息 - RabbitMQ 中既没有这样的功能,也没有 AMQP 协议。

    看起来您的消费者是问题的原因,因此您必须对其进行调整(重写)以在消息处理或失败后立即发布。

    【讨论】:

    • 它不会清除未确认的消息。来自queue.purge方法文档块:This method removes all messages from a queue which are >>> not awaiting acknowledgment <<<,这是严格的AMQP协议实现。
    • @pinepain 我花了一些时间编写代码来查找带有消息的队列。然后调用purge 命令,然后循环等待,但队列从未清空。感谢您的澄清。我将不得不重新考虑我的方法。
    【解决方案3】:

    一旦队列中没有“就绪”消息,将其删除并重新创建。

    【讨论】:

      【解决方案4】:

      使用此方法会丢失队列内容。

      您需要先将消息放回队列中,然后才能清除它们:

      作为替代方案,这不需要等待:

      • delete 并重新创建队列
      • 重启服务器

      【讨论】:

      • 然后被解雇。
      【解决方案5】:

      您需要调用basic.recover 来强制将所有未确认的消息重新排入失败的通道。请注意有关此函数的勘误表,指出 RabbitMQ 仅支持重新排队模式。

      【讨论】:

        【解决方案6】:

        对于软件开发人员,请使用以下代码。

        channel.purgeQueue(queue-name);

        如果我们使用此代码,队列将被清除,并且将存在相同的队列。

        【讨论】:

          【解决方案7】:

          发生这种情况的一种情况是,如果消费者由于处理错误而无法回收相同的消息。在这种情况下,the RabbitMQ queue management interface 可能会将消息显示为 Unacked,但实际上它们是从队列中读取并处理(到故障点)然后重新排队(以启用重试)在快节奏——每秒可能数千次

          在这个循环中,消息短暂地处于Ready 状态,但会立即被您的应用程序再次删除——然后循环再次开始。例如this auto-requeue behavior is the default for Spring AMQP.

          由于消息永远不会处于就绪状态,管理界面的 Get Message(s) 按钮不太可能起作用。如果您有队列访问权限,那么可行的方法是运行一个单独的自定义消费者实例,也许是在本地,但其特定目的是删除而不是重新排队有问题的消息。

          通过 RabbitMQ 的 Fair Dispatch 机制,您的额外消费者将可能收到有问题的消息,并有机会执行您的自定义处理。

          您甚至可以编写一个自定义实用程序来执行此操作,并使用逻辑来过滤、分析或死信感兴趣的消息。

          【讨论】:

            【解决方案8】:

            如果要清除队列的内容,那么可以使用AMQP方法queue.purge:AMQP中有队列清除:http://www.rabbitmq.com/amqp-0-9-1-reference.html#queue.purge

            您可以使用管理插件进行类似操作。

            【讨论】: