【发布时间】:2019-07-18 21:39:40
【问题描述】:
我想获得一份存在于 Rabbitmq 队列中的消息的副本,而不使用它们。可能吗 ? 提前致谢,
【问题讨论】:
标签: python-3.x rabbitmq
我想获得一份存在于 Rabbitmq 队列中的消息的副本,而不使用它们。可能吗 ? 提前致谢,
【问题讨论】:
标签: python-3.x rabbitmq
我发现有一种更好的方法可以使用channel.basic_get() 函数获取队列中的所有消息,如下代码所示:
def __init__(self):
self.host = ConfigTools().get_attr('host')
self.waiting_queue = ConfigTools().get_attr('test_queue_name')
def view_queue(self) -> list:
"""Reads everything from the queue, then disconnects, causing the server to requeue the messages
Returning the delivery tag is pointless at this point because the server makes the tag (an integer) up on
the fly and the order can shuffle constantly"""
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=self.host))
msgs = []
while True:
chl = connection.channel()
method_frame, header_frame, body = chl.basic_get(queue='test')
if method_frame:
print("body : ", body)
msgs.append(body)
else:
print("No more messages returned")
connection.close()
break
return msgs
然后,如果在任何时候我知道我想弹出队列中的哪条消息,我可以使用类似的东西:
def remove(self, item) -> list:
"""Removes the item from the queue. Goes through the entire queue, similar to view_queue, and acknowledges
the msg in the list that matches, and returns the msg.
If item matches more than one message on the queue, only one is deleted
"""
if isinstance(item, list):
if not (isinstance(i, bytes) for i in item):
print("Item must be a list of only byte objects")
if not isinstance(item, bytes):
print("Item must be a singe bytes object")
raise TypeError
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=self.host))
msgs = []
while True:
chl = connection.channel()
method_frame, header_frame, body = chl.basic_get(queue='test')
if method_frame:
print('body: ', body)
if body == item:
print("item found!")
msgs.append(body)
chl.basic_ack(method_frame.delivery_tag)
connection.close()
return msgs
else:
print("Message not found")
connection.close()
break
return msgs
注意:我将它用于一个小型应用程序 - 队列上的消息少于 50 条。我不能说这个功能在更大的应用程序中会如何发挥作用。
【讨论】:
我想获取 Rabbitmq 队列中存在的消息的副本 不消耗它们。有可能吗?
没有。您最接近的选择是消费或获取一条消息,然后通过否定确认拒绝它。
注意:RabbitMQ 团队会监控 rabbitmq-users mailing list 并且有时只回答 StackOverflow 上的问题。
【讨论】:
也许你可以注册一个消费者(如官方文档中的 here 所示)无需向经纪人确认:no_ack=True
channel.basic_consume(callback, queue='hello', no_ack=True)
这样您的消费者会收到消息内容,但消息本身并没有被代理标记为已传递(当您退出时返回Ready 状态)。
这可能不是满足您需要的最干净的方法,但它有效且简单。
您可以采用的另一种(但类似的)方法是基于所谓的 pull API(与您在注册 时使用的 push API 相反)订户);我在 .Net 应用程序中使用了这种方法:您可以找到 .Net 文档here,我认为 Python API 在这方面也很相似。
关键思想是得到消息而不给出确认:channel.BasicGet(queueName, noAck)
我希望这可以帮助您迈向完整且可靠的解决方案!
【讨论】: