【发布时间】:2021-01-07 15:51:25
【问题描述】:
我有 2 个通过rabbitmq 进行通信的微服务,我需要实现优先级消息。
第一个微服务充当发布者,用 symfony + messenger(amqp 传输)编写。
第二个微服务充当消费者,用python + pika编写。
messenger 文档 (https://symfony.com/doc/current/messenger.html#prioritized-transports) 建议对不同的消息优先级使用单独的队列,该组件无法使用 rabbitmq 的内置功能来对消息进行优先级排序。而且实际上发布者没有问题,我对其进行了配置,以便必要的消息进入优先级队列。
消费者出现问题,我无法让 pika 先读取优先级队列,然后再读取常规队列。
以下是我的 messenger 组件配置示例:
framework:
messenger:
transports:
priority:
dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
options:
exchange:
name: priority
queues:
priority: ~
normal:
dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
options:
exchange:
name: normal
queues:
normal: ~
routing:
'App\Message\PriorityRequest': priority
'App\Message\NormalRequest': normal
这就是我填满队列的方式:
for ($i = 0; $i < 10; $i++) {
$bus->dispatch(new PriorityRequest($i, 'priority'));
$bus->dispatch(new NormalRequest($i, 'normal'));
}
下面是一个在 python + pika 中实现消费者的例子:
import pika
import os
def do_work(self, connection, channel, delivery_tag, body):
print(body)
parameters = pika.URLParameters(os.getenv('MESSENGER_TRANSPORT_DSN'))
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.basic_qos(prefetch_count=1)
channel.queue_declare(queue='priority', durable=True)
channel.queue_declare(queue='normal', durable=True)
channel.basic_consume(queue='priority', on_message_callback=do_work, auto_ack=True)
channel.basic_consume(queue='normal', on_message_callback=do_work, auto_ack=True)
channel.start_consuming()
如果我们运行消费者代码,我们会得到以下输出:
{'id': 0, 'data': 'priority'}
{'id': 0, 'data': 'normal'}
{'id': 1, 'data': 'priority'}
{'id': 1, 'data': 'normal'}
{'id': 2, 'data': 'priority'}
{'id': 2, 'data': 'normal'}
{'id': 3, 'data': 'priority'}
{'id': 3, 'data': 'normal'}
{'id': 4, 'data': 'priority'}
{'id': 4, 'data': 'normal'}
{'id': 5, 'data': 'priority'}
{'id': 5, 'data': 'normal'}
{'id': 6, 'data': 'priority'}
{'id': 6, 'data': 'normal'}
{'id': 7, 'data': 'priority'}
{'id': 7, 'data': 'normal'}
{'id': 8, 'data': 'priority'}
{'id': 8, 'data': 'normal'}
{'id': 9, 'data': 'priority'}
{'id': 9, 'data': 'normal'}
消息按FIFO顺序处理,如何强制pika先处理来自优先队列的消息,只有当优先队列为空时才进入普通队列?
【问题讨论】: