【问题标题】:RabbitMQ / Pika - guaranteeing that messages are received in the order created?RabbitMQ / Pika - 保证按创建的顺序接收消息?
【发布时间】:2023-03-16 03:21:02
【问题描述】:

作为一个简单的例子,我将 5 个项目添加到一个新的 RabbitMQ(v 2.6.1) 队列中:

import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='not.my.real.server.net'))
channel = connection.channel()
channel.queue_declare(queue='dw.neil',durable=True)
# add 5 messages to the queue, the numbers 1-5
for x in range(5):
    message = x+1
    channel.basic_publish(exchange='',routing_key='dw.neil', body=str(message))
    print " [x] Sent '%s'" % message
connection.close()

我清除了我的队列,然后运行上面的代码来添加 5 个项目:

nkodner@hadoop4 sports_load_v2$ python send_5.py 
 [x] Sent '1'
 [x] Sent '2'
 [x] Sent '3'
 [x] Sent '4'
 [x] Sent '5'

现在,我正在尝试模拟失败的处理。给定以下代码以从队列中使用。请注意,我对 basic_ack 的调用已注释掉:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='not.my.real.server.net'))
channel = connection.channel()
channel.queue_declare(queue='dw.neil',durable=True)
method_frame, header_frame, body=channel.basic_get(queue='dw.neil')
print method_frame, header_frame
print "body: %s" % body
#channel.basic_ack(delivery_tag=method_frame.delivery_tag)
connection.close()

我运行接收代码以从队列中取出一个项目。正如我所料,我得到了第 1 项:

nkodner@hadoop4 sports_load_v2$ python r5.py 
<Basic.GetOk(['message_count=9', 'redelivered=False', 'routing_key=dw.neil', 'delivery_tag=1', 'exchange='])>
<BasicProperties([])>
body: 1

由于对 channel.basic_ack() 的调用已被注释掉,我希望将未确认的消息放在队列中,以便下一个消费者得到它。我希望消息 #1 是队列中的第一条消息(再次),并将 Redelivered 属性设置为 True。相反,收到消息 #2:

nkodner@hadoop4 sports_load_v2$ python r5.py 
<Basic.GetOk(['message_count=9', 'redelivered=False', 'routing_key=dw.neil', 'delivery_tag=1', 'exchange='])>
<BasicProperties([])>
body: 2

并且队列中的所有其他消息都在 #1 返回之前接收到,并将重新传递标志设置为 True:

...

nkodner@hadoop4 sports_load_v2$ python r5.py 
<Basic.GetOk(['message_count=9', 'redelivered=False', 'routing_key=dw.neil', 'delivery_tag=1', 'exchange='])>
<BasicProperties([])>
body: 5

nkodner@hadoop4 sports_load_v2$ python r5.py 
<Basic.GetOk(['message_count=9', 'redelivered=True', 'routing_key=dw.neil', 'delivery_tag=1', 'exchange='])>
<BasicProperties([])>
body: 1

我是否可以设置任何属性或选项,以便在得到确认之前一直收到 #1?

我的用例是加载具有顺序生成文件的数据仓库。我们正在使用基于消息的处理来让我的程序知道一些新文件已准备好并要加载到 DW 中。我们必须按照它们生成的顺序来处理这些文件。

【问题讨论】:

  • edit:这个问题随着 rabbitmq 2.7 的发布而得到解决。从 2.7.0 开始,未确认/拒绝的项目被放回队列的前面。

标签: python messaging rabbitmq amqp pika


【解决方案1】:

这已在 RabbitMQ 2.7.0 中解决 - 我们运行的是 2.6.1。

来自release notes

此版本中的新功能包括:

  • 为消费者重新排队的消息的保留顺序

【讨论】:

    【解决方案2】:

    尝试使用 channel.basic_reject - 这应该将未确认的消息推送回 RabbitMQ,RabbitMQ 会将消息视为新消息。另外——如果你有一条失败的消息卡住了,你可以使用 channel.basic_recover 告诉 RabbitMQ 重新传递所有未确认的消息。

    http://www.rabbitmq.com/extensions.html#negative-acknowledgements 提供有关 Basic.Reject 与 Basic.Nack 的区别信息。

    消息排序语义在http://www.rabbitmq.com/semantics.html进行解释

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2011-07-23
      • 2019-05-04
      • 1970-01-01
      • 1970-01-01
      • 2020-04-20
      • 2012-12-26
      相关资源
      最近更新 更多