【发布时间】:2020-02-25 00:20:36
【问题描述】:
我的代码中有一个方法可以将消息发送到 sqs。我想使用 moto 并使用 aws sqs 服务。
下面是我的代码
def posttosqs(self,url,body):
try:
sqs_cli = boto3.client('sqs')
sqs_cli.send_message(QueueUrl=url, MessageBody=body)
except Exception as e:
raise Exception("Posting failed to SQS")
这是我的测试用例
@mock_sqs
@mock_s3
def test_case_use_moto(self):
conn = boto3.resource('s3', region_name='us-east-1')
conn.create_bucket(Bucket='Test')
conn = boto3.client('sqs', region_name='us-east-1')
queue = conn.create_queue(QueueName='Test')
os.environ["SQS_URL"] = queue["QueueUrl"]
conn.send_message(QueueUrl=queue["QueueUrl"], MessageBody="test") #this works
#SQS_URL = "https://queue.amazonaws.com/123456789012/Test"
ctx = context_class_object()
event = {"body": "test"}
resp = lambda.handle_request(event, ctx)
assert resp["statusCode"] == 200
conn.send_message 在测试用例中有效,但方法 posttosqs 失败
error: when calling the SendMessage operation: The specified queue does not exist for this wsdl version.
我能够使用上述方法成功测试 S3 操作,但不能测试 SQS 操作
【问题讨论】:
-
你有没有想过这个问题?
标签: python boto3 amazon-sqs python-unittest moto