【问题标题】:Python AWS SQS mocking with MOTO使用 MOTO 模拟 Python AWS SQS
【发布时间】:2021-09-16 17:55:29
【问题描述】:

我正在尝试使用 moto 模拟 AWS SQS,下面是我的代码

from myClass import get_msg_from_sqs
from moto import mock_sqs
#from moto.sqs import mock_sqs


@mock_sqs
def test_get_all_msg_from_queue():
    
    #from myClass import get_msg_from_sqs
    
    conn = boto3.client('sqs', region_name='us-east-1')
    
    queue = conn.create_queue(QueueName='Test')
    
    os.environ["SQS_URL"] = queue["QueueUrl"]
    
    queue.send_message( MessageBody=json.dumps({'a': '1', 'b': '2', 'c': '3'}))
    
    
    #Tried this as well
    #conn.send_message(QueueUrl=queue["QueueUrl"], MessageBody=json.dumps({'a': '1', 'b': '2', 'c': '3'}))

    resp = get_msg_from_sqs(queue["QueueUrl"]) 

    assert resp is not None

执行此操作时出现以下错误

>       queue.send_message( MessageBody=json.dumps({'a': '1', 'b': '2', 'c': '3'}))
E       AttributeError: 'dict' object has no attribute 'send_message'

如果我尝试在 SQS 中发送消息的另一种方式(请参阅注释掉的代码#Tried this as well) 然后在我的方法 get_msg_from_sqs 中实际调用 SQS 时,我收到以下错误

E  botocore.exceptions.ClientError: An error occurred
(InvalidAddress) when calling the ReceiveMessage operation: 
The address https://queue.amazonaws.com/ is not valid for this endpoint.

我在win10上用PyCharm运行,moto版本设置为

moto = "^2.2.6"

我的代码如下

sqs = boto3.client('sqs')
def get_msg_from_queue(queue_url: str) -> dict:
    return sqs.receive_message(QueueUrl=queue_url, AttributeNames=['All'],
               MaxNumberOfMessages=1, VisibilityTimeout=3600, WaitTimeSeconds=0)

我在这里错过了什么?

【问题讨论】:

  • 你在这里使用 mock 来测试从队列中读取的消息是否不是 None。通常,您正在寻找一个特定的模式,您设置然后断言。您能否也添加 get_msg_from_sqs 的代码?
  • @vivekveeramani 我已经添加了 get_msg_from_sqs 的代码,希望对您有所帮助。
  • 正如您在文档中看到的,create_queue 返回一个字典,而不是一个队列:boto3.amazonaws.com/v1/documentation/api/latest/reference/…
  • @gshpychka 请再看一遍代码,queue就是我用过的变量名。
  • @gshpychka 你能添加和答案一样的东西吗,我想把赏金奖励给你

标签: python mocking amazon-sqs moto


【解决方案1】:

根据@gshpychka,您需要了解create_queue 的工作原理。具体来说,它返回这种形式的字典:

响应结构 (字典)--

QueueUrl(字符串)--

创建的 Amazon SQS 队列的 URL。

所以你可以使用这个 api:

import boto3
from time import sleep

conn = boto3.client('sqs')
queue = conn.create_queue(QueueName="Test")["QueueUrl"]
sleep(1) # wait at least 1s before using queue
response = conn.send_message(
    QueueUrl=queue,
    MessageBody='string',
    ...)

我同意文档令人困惑。混乱可能是由于 sqs resource api 造成的,它的工作方式不同:

import boto3
from time import sleep

sqs = boto3.resource('sqs')
queue = sqs.create_queue(QueueName="Test2")
sleep(1)
queue.send_message(...)

这是因为 this api 返回一个 Queue 对象,这可能是您所期望的。

请注意@gshpychka 已经在评论中给出了答案;我刚写出来。

【讨论】:

    【解决方案2】:

    您的 queue 变量是 create_queue 返回的字典:

    queue = conn.create_queue(QueueName='Test')
    

    它不是一个队列,因此你不能在它上面调用sendMessage

    为此,您需要创建一个队列对象:

    sqs = boto3.resource('sqs')
    response = conn.create_queue(QueueName='Test')
    queue_url = response["QueueURL"]
    queue = sqs.Queue(queue_url)
    
    queue.send_message()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-06-14
      • 2022-11-06
      • 1970-01-01
      • 2022-01-23
      • 2023-01-17
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多