【发布时间】:2020-04-25 03:12:23
【问题描述】:
在我的单元测试中,我想简单地开始消费、发布消息、接收响应并断言响应是否符合我的预期。但是,我一直在尝试这样做几个小时,但没有找到解决方案。
问题是我不能在一个类中定义一个停止消费的方法。我试过定义这样的方法:
def stop(self):
self.channel.basic_cancel()
def stop(self):
self.channel.stop_consuming()
def stop(self):
self.connection.close()
但似乎没有任何效果。我读到这是因为一旦你执行start_consuming(),停止消费的唯一方法就是在发送消息后取消它。但是如果我这样做,那么我将修改原始的on_request,这对我的应用程序没有用处,因为连接将在第一条消息后关闭。我找到了pytest-rabbitmq,但文档对我来说不是很清楚,因此不知道我是否可以使用这个插件来实现我想要的。
对了,basic_cancel、stop_consuming和close有什么区别?
【问题讨论】:
-
@Peter 对不起,如果您有这种印象,但 StackOverflow 说 cmets 不应该被“谢谢”所困扰,而是接受正确的答案是正确的答案。不是我不礼貌,而是 StackOverflow 说要这样做。但是,如果可以的话,我会说谢谢+一些实质性的东西。关于帮助他人,我很乐意,但我不知道你必须回答这里提出的 99.9% 的问题。我有自己的 YouTube 频道和 Discord 服务器,我可以在其中帮助人们进行编程。对不起,太“平均”了。