【问题标题】:RabbitMQ: How to move failed message from one queue to another queue?RabbitMQ:如何将失败的消息从一个队列移动到另一个队列?
【发布时间】:2020-08-13 10:43:07
【问题描述】:

我有两个队列:

当我运行 rabbitmqadmin list queues vhost name node messages message_stats.publish_details.rate -u admin -p admin 时同样可见:

我明白了:

+-------+-------------------------+-------------------------+----------+------------------------------------+
| vhost |          name           |          node           | messages | message_stats.publish_details.rate |
+-------+-------------------------+-------------------------+----------+------------------------------------+
| /     | high_priority           | rabbit@server-rabbitmq  | 5        | 0.0                                |
| /     | high_priority_secondary | rabbit@server-rabbitmq  | 0        | 0.0                                |
+-------+-------------------------+-------------------------+----------+------------------------------------+

我的交流(rabbitmqadmin -V / list exchanges -u admin -p admin)如下:

+-------------------------+---------+
|          name           |  type   |
+-------------------------+---------+
|                         | direct  |
| amq.direct              | direct  |
| amq.fanout              | fanout  |
| amq.headers             | headers |
| amq.match               | headers |
| amq.rabbitmq.trace      | topic   |
| amq.topic               | topic   |
| high_priority           | direct  |
| high_priority_secondary | direct  |
| low_priority            | direct  |
+-------------------------+---------+

队列和整个相关逻辑是在 PHP / Symfony 中实现的,但是我想使用本机逻辑(如果可能的话)通过在终端中使用 rabbitmqadminrabbitmqctl 命令。

如果high_priority 上的消息失败,我希望 RabbitMQ 自动将其移动到 high_priority_secondary 队列,而无需任何 PHP 参与。这可能吗?我已经开始阅读有关 Dead Letter Exchanges 的信息,但我不确定如何处理。

我已经为辅助队列创建了一个消费者,所以只要将消息移到那里,它就会被处理。

是否可以仅在 CLI 中实现?

仅供参考:有一些关于 SO 的建议帖子已经涵盖了这个问题,但没有一个解决方案是纯粹的 CLI 解决方案。

【问题讨论】:

标签: php rabbitmq amqp


【解决方案1】:

high_priority_secondary 队列应该绑定到high_priority_secondary 交换。 high_priority queue 应该绑定到 high_priority exchange 并且应该使用 x-dead-letter-exchange = high_priority_secondary 声明。

所以应该用死信交换声明队列。

要对此进行测试,只需在您使用来自high_priority 队列的消息时拒绝带有重新排队的消息。

【讨论】:

    【解决方案2】:

    好的,虽然我不必修改任何 PHP 代码,但我确实必须在框架级别更改 yaml 配置,因为我希望我的解决方案能够持久化并成为代码库的一部分。

    在你的app/config/services/rabbitmq.yaml:

    定义生产者:

    high_priority:
        connection: default
        class: Foo\Infrastructure\RabbitMQ\SuppressedProducer
        exchange_options:
            name: 'high_priority'
            type: direct
    high_priority_secondary:
        connection: default
        class: Foo\Infrastructure\RabbitMQ\SuppressedProducer
        exchange_options:
            name: 'high_priority_secondary'
            type: direct
    message_hospital:
        connection: default
        class: Foo\Infrastructure\RabbitMQ\SuppressedProducer
        exchange_options:
            name: 'message_hospital'
            type: direct
    

    定义消费者:

    high_priority:
        connection: default
        exchange_options:
            name: 'high_priority'
            type: direct
        queue_options:
            name: 'high_priority'
            arguments:
                x-dead-letter-exchange: ['S', 'high_priority_secondary']
        qos_options:
            prefetch_size: 0
            prefetch_count: 1
            global: false
        callback: foo.task_bus.consumer
    high_priority_secondary:
        connection: default
        exchange_options:
            name: 'high_priority_secondary'
            type: direct
        queue_options:
            name: 'high_priority_secondary'
            arguments:
                x-dead-letter-exchange: ['S', 'message_hospital']
        qos_options:
            prefetch_size: 0
            prefetch_count: 1
            global: false
        callback: foo.task_bus.consumer
    message_hospital:
        connection: default
        exchange_options:
            name: 'message_hospital'
            type: direct
        queue_options:
            name: 'message_hospital'
        qos_options:
            prefetch_size: 0
            prefetch_count: 1
            global: false
        callback: foo.task_bus.consumer
    

    现在队列看起来像:

    多亏了 DLX 属性,消息一旦在之前的队列中失败,就会进入医院队列。

    【讨论】:

      最近更新 更多