【发布时间】:2021-09-16 17:55:29
【问题描述】:
我正在尝试使用 moto 模拟 AWS SQS,下面是我的代码
from myClass import get_msg_from_sqs
from moto import mock_sqs
#from moto.sqs import mock_sqs
@mock_sqs
def test_get_all_msg_from_queue():
#from myClass import get_msg_from_sqs
conn = boto3.client('sqs', region_name='us-east-1')
queue = conn.create_queue(QueueName='Test')
os.environ["SQS_URL"] = queue["QueueUrl"]
queue.send_message( MessageBody=json.dumps({'a': '1', 'b': '2', 'c': '3'}))
#Tried this as well
#conn.send_message(QueueUrl=queue["QueueUrl"], MessageBody=json.dumps({'a': '1', 'b': '2', 'c': '3'}))
resp = get_msg_from_sqs(queue["QueueUrl"])
assert resp is not None
执行此操作时出现以下错误
> queue.send_message( MessageBody=json.dumps({'a': '1', 'b': '2', 'c': '3'}))
E AttributeError: 'dict' object has no attribute 'send_message'
如果我尝试在 SQS 中发送消息的另一种方式(请参阅注释掉的代码#Tried this as well) 然后在我的方法 get_msg_from_sqs 中实际调用 SQS 时,我收到以下错误
E botocore.exceptions.ClientError: An error occurred
(InvalidAddress) when calling the ReceiveMessage operation:
The address https://queue.amazonaws.com/ is not valid for this endpoint.
我在win10上用PyCharm运行,moto版本设置为
moto = "^2.2.6"
我的代码如下
sqs = boto3.client('sqs')
def get_msg_from_queue(queue_url: str) -> dict:
return sqs.receive_message(QueueUrl=queue_url, AttributeNames=['All'],
MaxNumberOfMessages=1, VisibilityTimeout=3600, WaitTimeSeconds=0)
我在这里错过了什么?
【问题讨论】:
-
你在这里使用 mock 来测试从队列中读取的消息是否不是 None。通常,您正在寻找一个特定的模式,您设置然后断言。您能否也添加 get_msg_from_sqs 的代码?
-
@vivekveeramani 我已经添加了 get_msg_from_sqs 的代码,希望对您有所帮助。
-
正如您在文档中看到的,
create_queue返回一个字典,而不是一个队列:boto3.amazonaws.com/v1/documentation/api/latest/reference/… -
@gshpychka 请再看一遍代码,queue就是我用过的变量名。
-
@gshpychka 你能添加和答案一样的东西吗,我想把赏金奖励给你
标签: python mocking amazon-sqs moto