【问题标题】:moto for python - write to sqs failsmoto for python - 写入 sqs 失败
【发布时间】:2020-02-25 00:20:36
【问题描述】:

我的代码中有一个方法可以将消息发送到 sqs。我想使用 moto 并使用 aws sqs 服务。

下面是我的代码

 def posttosqs(self,url,body):
    try:

        sqs_cli = boto3.client('sqs')
        sqs_cli.send_message(QueueUrl=url, MessageBody=body)
    except Exception as e:

        raise Exception("Posting failed to SQS")

这是我的测试用例

    @mock_sqs
    @mock_s3
    def test_case_use_moto(self):

        conn = boto3.resource('s3', region_name='us-east-1')
        conn.create_bucket(Bucket='Test')
        conn = boto3.client('sqs', region_name='us-east-1')
        queue = conn.create_queue(QueueName='Test')

        os.environ["SQS_URL"] = queue["QueueUrl"]
        conn.send_message(QueueUrl=queue["QueueUrl"], MessageBody="test") #this works
        #SQS_URL = "https://queue.amazonaws.com/123456789012/Test"
        ctx = context_class_object()
        event = {"body": "test"}
        resp = lambda.handle_request(event, ctx)
        assert resp["statusCode"] == 200

conn.send_message 在测试用例中有效,但方法 posttosqs 失败

error: when calling the SendMessage operation: The specified queue does not exist for this wsdl version.

我能够使用上述方法成功测试 S3 操作,但不能测试 SQS 操作

【问题讨论】:

  • 你有没有想过这个问题?

标签: python boto3 amazon-sqs python-unittest moto


【解决方案1】:

安装特定的 moto 版本对我有用,问题在于较新版本的 moto 库。

pip install moto==2.2.2

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2015-04-01
    • 2015-02-05
    • 2017-05-13
    • 1970-01-01
    • 2011-01-17
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多