【问题标题】:How do I get pika to listen on priority queues first?如何让 pika 先听优先级队列?
【发布时间】:2021-01-07 15:51:25
【问题描述】:

我有 2 个通过rabbitmq 进行通信的微服务,我需要实现优先级消息。

第一个微服务充当发布者,用 symfony + messenger(amqp 传输)编写。

第二个微服务充当消费者,用python + pika编写。

messenger 文档 (https://symfony.com/doc/current/messenger.html#prioritized-transports) 建议对不同的消息优先级使用单独的队列,该组件无法使用 rabbitmq 的内置功能来对消息进行优先级排序。而且实际上发布者没有问题,我对其进行了配置,以便必要的消息进入优先级队列。

消费者出现问题,我无法让 pika 先读取优先级队列,然后再读取常规队列。

以下是我的 messenger 组件配置示例:

framework:
    messenger:
        transports:
            priority:
                dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
                options:
                    exchange:
                        name: priority
                    queues:
                        priority: ~
            normal:
                dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
                options:
                    exchange:
                        name: normal
                    queues:
                        normal: ~
        routing:
            'App\Message\PriorityRequest': priority
            'App\Message\NormalRequest': normal

这就是我填满队列的方式:

for ($i = 0; $i < 10; $i++) {
    $bus->dispatch(new PriorityRequest($i, 'priority'));
    $bus->dispatch(new NormalRequest($i, 'normal'));
}

下面是一个在 python + pika 中实现消费者的例子:

import pika
import os

def do_work(self, connection, channel, delivery_tag, body):
   print(body)


parameters = pika.URLParameters(os.getenv('MESSENGER_TRANSPORT_DSN'))
connection = pika.BlockingConnection(parameters)

channel = connection.channel()
channel.basic_qos(prefetch_count=1)

channel.queue_declare(queue='priority', durable=True)
channel.queue_declare(queue='normal', durable=True)

channel.basic_consume(queue='priority', on_message_callback=do_work, auto_ack=True)
channel.basic_consume(queue='normal', on_message_callback=do_work, auto_ack=True)

channel.start_consuming()

如果我们运行消费者代码,我们会得到以下输出:

{'id': 0, 'data': 'priority'}
{'id': 0, 'data': 'normal'}
{'id': 1, 'data': 'priority'}
{'id': 1, 'data': 'normal'}
{'id': 2, 'data': 'priority'}
{'id': 2, 'data': 'normal'}
{'id': 3, 'data': 'priority'}
{'id': 3, 'data': 'normal'}
{'id': 4, 'data': 'priority'}
{'id': 4, 'data': 'normal'}
{'id': 5, 'data': 'priority'}
{'id': 5, 'data': 'normal'}
{'id': 6, 'data': 'priority'}
{'id': 6, 'data': 'normal'}
{'id': 7, 'data': 'priority'}
{'id': 7, 'data': 'normal'}
{'id': 8, 'data': 'priority'}
{'id': 8, 'data': 'normal'}
{'id': 9, 'data': 'priority'}
{'id': 9, 'data': 'normal'}

消息按FIFO顺序处理,如何强制pika先处理来自优先队列的消息,只有当优先队列为空时才进入普通队列?

【问题讨论】:

    标签: python rabbitmq pika


    【解决方案1】:

    开箱即用的 Pika 不支持此功能。

    一个选项是首先从优先队列中basic_consume。当队列为空时,取消该消费者,然后从另一个队列中取消basic_consume。完成该工作后,重复并返回优先级队列。


    注意:RabbitMQ 团队会监控 rabbitmq-users mailing list,并且有时只回答 StackOverflow 上的问题。

    【讨论】:

      猜你喜欢
      • 2011-12-06
      • 2011-12-20
      • 1970-01-01
      • 1970-01-01
      • 2011-03-20
      • 1970-01-01
      • 2013-02-13
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多