【发布时间】:2021-07-07 15:34:03
【问题描述】:
我想编写一个集成测试检查 Python 脚本与 Azure 服务总线队列的连接。测试应该:
- 向队列发送消息,
- 确认邮件已进入队列。
测试看起来像这样:
import pytest
from azure.servicebus import ServiceBusClient, ServiceBusMessage, ServiceBusSender
CONNECTION_STRING = <some connection string>
QUEUE = <queue name>
def send_message_to_service_bus(sender: ServiceBusSender, msg: str) -> None:
message = ServiceBusMessage(msg)
sender.send_message(message)
class TestConnectionWithQueue:
def test_message_is_sent_to_queue_and_received(self):
msg = "test message sent to queue"
expected_message = ServiceBusMessage(msg)
servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STRING, logging_enable=True)
with servicebus_client:
sender = servicebus_client.get_queue_sender(queue_name=QUEUE)
with sender:
send_message_to_service_bus(sender, expected_message)
receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE)
with receiver:
messages_in_queue = receiver.receive_messages(max_message_count=10, max_wait_time=20)
assert any(expected_message == str(actual_message) for actual_message in messages_in_queue)
测试偶尔会奏效,但通常不会奏效。没有其他消息同时发送到队列。在我调试代码时,如果测试不起作用,变量messages_in_queue 只是一个空列表。
为什么代码不能一直工作,应该怎么做才能修复它?
【问题讨论】:
-
请编辑您的问题并包含
send_message_to_service_bus方法的代码。 -
感谢收看这个!代码已更新。
-
谢谢。只是想知道队列中有多少消息?
-
当我调试代码并检查变量
messages_in_queue时,要么是我通过此代码发送的,要么是没有。
标签: python testing integration-testing servicebus azure-servicebus-queues