【问题标题】:RabbitMQ non-blocking consumerRabbitMQ 非阻塞消费者
【发布时间】:2014-03-19 20:17:06
【问题描述】:

我在 Python 中使用 RabbitMQ 来管理生产者和多个消费者之间的多个队列。在 RabbitMQ 网站 (routing model) 的示例中,消费者被阻止。这意味着它们在每次队列中有新的“任务”时停止在 start_sumption() 并执行回调函数。

我的问题是:我如何以一种仍在等待任务的方式实现我的消费者(因此,每次队列中有新事物时都会调用回调函数)但同时他可以执行其他任务工作/代码。

谢谢

【问题讨论】:

  • 为什么不在单独的线程上运行start_consuming()
  • 好吧,我很容易找到解决方案。除了使用 basic_consume,我可以简单地在函数中使用 basic_get 并每隔 X 秒调用一次该函数。但是有一个问题:队列任务会按照某种顺序交付吗?
  • @HugoSousa 如果你能在这里发布一个完整的解决方案会很棒 - 我是 rabbitmq 的新手,它真的会帮助其他人。
  • 另一种选择是使用 pika 和 Tornado 完全异步 (pika.readthedocs.org/en/latest/examples/tornado_consumer.html)
  • @goncalopp 根据FAQ,pika 不是线程安全的

标签: python asynchronous parallel-processing queue rabbitmq


【解决方案1】:

表格FAQ:

Pika 在代码中没有任何线程概念。如果你想 使用带线程的 Pika,确保每个都有 Pika 连接 线程,在该线程中创建。分享一只鼠兔是不安全的 跨线程连接,

所以让我们在线程内部创建连接:

import pika


class PikaMassenger():

    exchange_name = '...'

    def __init__(self, *args, **kwargs):
        self.conn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
        self.channel = self.conn.channel()
        self.channel.exchange_declare(
            exchange=self.exchange_name, 
            exchange_type='topic')

    def consume(self, keys, callback):
        result = self.channel.queue_declare('', exclusive=True)
        queue_name = result.method.queue
        for key in keys:
            self.channel.queue_bind(
                exchange=self.exchange_name, 
                queue=queue_name, 
                routing_key=key)

        self.channel.basic_consume(
            queue=queue_name, 
            on_message_callback=callback, 
            auto_ack=True)

        self.channel.start_consuming()


    def __enter__(self):
        return self


    def __exit__(self, exc_type, exc_value, traceback):
        self.conn.close()

def start_consumer():

    def callback(ch, method, properties, body):
        print(" [x] %r:%r consumed" % (method.routing_key, body))

    with PikaMassenger() as consumer:
        consumer.consume(keys=[...], callback=callback)


consumer_thread = threading.Thread(target=start_consumer)
consumer_thread.start()

【讨论】:

    【解决方案2】:

    对于接收者

    import pika
    
    messages = []
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='message')
    
    def callback(ch, method, properties, message):
        print(message)
        messages.append(message)
    
    channel.basic_consume(callback,queue='message',no_ack=True)
    

    channel.basic_consume(callback,queue='message',no_ack=True)
    

    当你需要时) 或在线程中

    import threading
    
    import pika
    import time
    
    messages = []
    
    def recieve_messages():
        connection = pika.BlockingConnection(pika.ConnectionParameters(
            'localhost'))
        channel = connection.channel()
        channel.queue_declare(queue='hello')
    
        def callback(ch, method, properties, body):
            messages.append(body)
    
        channel.basic_consume(callback,
                              queue='hello',
                              no_ack=True)
        # channel.start_consuming()
        mq_recieve_thread = threading.Thread(target=channel.start_consuming)
        mq_recieve_thread.start()
    
    recieve_messages()
    while True:
        print(messages)
        time.sleep(1)
    

    【讨论】:

    • Pika 不是线程安全的 - API 不支持此功能
    • Robben_Ford 是对的,不应将线程与鼠兔一起使用。 “Pika 在代码中没有任何线程概念。如果您想将 Pika 与线程一起使用,请确保在该线程中创建每个线程都有一个 Pika 连接。跨线程共享一个 Pika 连接是不安全的。” -> 来自 pika doc 常见问题解答
    猜你喜欢
    • 1970-01-01
    • 2014-02-20
    • 2014-08-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多