【问题标题】:How to do a simple unit testing with RabbitMQ in python?如何在 python 中使用 RabbitMQ 进行简单的单元测试?
【发布时间】:2020-04-25 03:12:23
【问题描述】:

在我的单元测试中,我想简单地开始消费、发布消息、接收响应并断言响应是否符合我的预期。但是,我一直在尝试这样做几个小时,但没有找到解决方案。

问题是我不能在一个类中定义一个停止消费的方法。我试过定义这样的方法:

def stop(self):
    self.channel.basic_cancel()
def stop(self):
    self.channel.stop_consuming()
def stop(self):
    self.connection.close()

但似乎没有任何效果。我读到这是因为一旦你执行start_consuming(),停止消费的唯一方法就是在发送消息后取消它。但是如果我这样做,那么我将修改原始的on_request,这对我的应用程序没有用处,因为连接将在第一条消息后关闭。我找到了pytest-rabbitmq,但文档对我来说不是很清楚,因此不知道我是否可以使用这个插件来实现我想要的。

对了,basic_cancelstop_consumingclose有什么区别?

【问题讨论】:

  • @Peter 对不起,如果您有这种印象,但 StackOverflow 说 cmets 不应该被“谢谢”所困扰,而是接受正确的答案是正确的答案。不是我不礼貌,而是 StackOverflow 说要这样做。但是,如果可以的话,我会说谢谢+一些实质性的东西。关于帮助他人,我很乐意,但我不知道你必须回答这里提出的 99.9% 的问题。我有自己的 YouTube 频道和 Discord 服务器,我可以在其中帮助人们进行编程。对不起,太“平均”了。

标签: rabbitmq pika


【解决方案1】:

我对您的情况没有清楚的了解!以我的理解,你可以在同一个方法中创建连接和通道,这样你就可以在需要时发布、消费、断言和停止消费

希望这会有所帮助!

def test_rabbitmq():
    from pika import BlockingConnection, ConnectionParameters, PlainCredentials

    conn = BlockingConnection(ConnectionParameters(host='host', virtual_host='vhost', credentials=PlainCredentials('username', 'password')))
    channel = conn.channel()

    # define your consumer
    def on_message(channel, method_frame, header_frame, body):
        message = body.decode()
        # assert your message here
        # asset message == 'value'
        channel.basic_cancel('test-consumer')  # stops the consumer

    # define your publisher
    def publish_message(message):
        channel.basic_publish(exchange='', routing_key='', body=message')

    publish('your message')
    tag = channel.basic_consume(queue='queue', on_message_callback=on_message, consumer_tag='test-consumer')

stop_consuming - 取消所有消费者,指示 start_sumption 循环退出。

basic_cancel - 此方法取消消费者。消费者标签将作为输入。

close - 关闭连接/通道

Reference

【讨论】:

    猜你喜欢
    • 2019-10-25
    • 2012-09-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-01-08
    • 1970-01-01
    相关资源
    最近更新 更多