【问题标题】:How to do a simple Pika SelectConnection to send a message, in python?如何在 python 中做一个简单的 Pika SelectConnection 来发送消息?
【发布时间】:2015-05-19 17:30:50
【问题描述】:

我正在尝试将我的代码转换为通过 Pika 发送 rabbitmq 消息。我在理解如何使用异步连接(例如 SelectConnection)发送简单消息时遇到了很多麻烦。

在我使用 amqp 库的旧代码中,我简单地声明了一个这样的类:

import amqp as amqp

class MQ():

    mqConn = None
    channel = None

    def __init__(self):
        self.connect()

    def connect(self):
        if self.mqConn is None:
            self.mqConn = amqp.Connection(host="localhost", userid="dev", password="dev", virtual_host="/", insist=False)
            self.channel = self.mqConn.channel()

        elif not self.mqConn.connected:
            self.mqConn = amqp.Connection(host="localhost", userid="dev", password="dev", virtual_host="/", insist=False)
            self.channel = self.mqConn.channel()

    def sendMQ(self, message):
        self.connect()
        lMessage = amqp.Message(message)
        self.channel.basic_publish(lMessage, exchange="DevMatrixE", routing_key="dev_matrix_q") 

然后在我的代码的其他地方调用 sendMQ("this is my message"),然后代码继续。我不需要听确认等。

有人可以使用 pika 和 SelectConnection 编写一个简单的类,它也可以使用 sendMQ(“这是我的消息”)发送消息吗?我看过 pika 的例子,但我不知道如何绕过 ioloop 和 KeyboardInterrupt。我想我只是不确定如何让我的代码在没有所有这些尝试/例外的情况下继续运行......另外,我不确定如何通过所有回调传递我的消息......

感谢任何帮助!

谢谢。

【问题讨论】:

    标签: python rabbitmq pika


    【解决方案1】:

    整个事情都是回调驱动的,因为它是一种异步的做事方式。异步消费者很容易理解,我们可以通过提供回调函数来获取消息。然而,发布者部分至少对于初学者来说有点难以理解。

    通常我们需要一个队列来进行通信,发布者会定期从中获取数据。

    使用 SelectConnection 的关键是将你的发布消息函数注册到事件循环中,这可以通过connection.add_timeout 完成。完成发布后,注册下一轮发布。

    下一个问题是将初始注册放在哪里。初始注册可以在通道开放回调中完成。

    下面是一个代码片段,以便更好地理解。请注意,尚未准备好生产。因为它只以每秒 10 条的最大速度发布消息。您需要调整发布间隔,并在一次回调中发布更多消息。

    class MQ(Object):
        def __init___(self, queue):
            self.queue = queue
        def on_channel_open(self, chn):
            self.channel = chn
            self.connection.add_timeout(0.1, self.schedule_next_message)
        def schedule_next_message(self):
            try:
                msg = self.queue.get(True, 0.01)
                self.channel.basic_publish('YOUR EXCHANGE','YOUR ROUTING KEY',msg)
            except Queue.Empty:
                pass
            self.connection.add_timeout(0.1, self.schedule_next_message)
        def on_open(self, conn):
            self.connection = conn
            self.connection.channel(on_open_callback=self.on_channel_open)
        def run(self):
            # create a connection
            self.connection = pika.SelectConnection(pika.ConnectionParameters(heartbeat=600,host=args.mq_ip),self.on_open)
            try:
                self.connection.ioloop.start()
            except Exception:
                print("exception in publisher")
                self.connection.close()
                self.connection.ioloop.start()
    

    将 MQ(queue).run() 放在一个单独的线程中,当你想将消息放入 mq 时,只需将其放入队列对象中即可。

    【讨论】:

    • 我认为 add_timeout 在较新的鼠兔版本中被重命名为 call_later。也许你可以更新你的答案。
    【解决方案2】:

    作为第一种方法,我建议您从文章末尾提供的发布/订阅示例开始。一旦你理解了这个简单的例子,就开始遵循最后代码块之前提供的教程。本教程包含 6 个不同的用例,并附有 Python 示例。通过前 5 个步骤,您将了解它的工作方式。您应该清楚交换(将消息路由到每个队列的实体)、绑定密钥(用于连接交换和队列的密钥)、路由密钥(与发布者的消息一起发送的密钥以及交换器使用它来将消息路由到一个或另一个队列)和队列(一个可以存储消息的缓冲区,可以有超过 1 个(如果需要,可以有 1 个)订阅者,并且可以从超过 1 个交换处获取消息,并且基于不同的绑定键)。此外,还有不止一种类型的交换(扇出,主题(这可能是你需要的)。

    如果这一切听起来很新鲜,请按照 RabbitMQ 提供的教程进行操作:

    https://www.rabbitmq.com/tutorials/tutorial-one-python.html

    pub.py:

    #!/usr/bin/env python
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
    channel = connection.channel()
    
    channel.queue_declare(queue='hello')
    
    channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
    print " [x] Sent 'Hello World!'"
    connection.close()
    

    sub.py:

    #!/usr/bin/env python
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
    channel = connection.channel()
    
    channel.queue_declare(queue='hello')
    
    print ' [*] Waiting for messages. To exit press CTRL+C'
    
    def callback(ch, method, properties, body):
        print " [x] Received %r" % (body,)
    
    channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)
    
    channel.start_consuming()
    

    【讨论】:

    • 很高兴您尝试提供帮助,但他询问的是 SelectConnection 适配器。您提到的示例正在使用 BlockingConnection。
    • 带有 SelectConnection 的示例代码会更具说明性。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-07-23
    • 2011-02-27
    • 1970-01-01
    • 2013-11-20
    • 2014-04-30
    • 2020-10-31
    相关资源
    最近更新 更多