【发布时间】:2022-01-11 11:44:29
【问题描述】:
我有一个 S3 函数,定义如下:
def get_unprocessed_s3_object_list(s3_client, bucket: str) -> List[str]:
paginator = s3_client.get_paginator("list_objects_v2")
page_iterator = paginator.paginate(Bucket=bucket)
all_files = {}
for page in page_iterator:
if page["KeyCount"] > 0:
for item in page["Contents"]:
all_files[item["Key"]] = item["LastModified"]
return sorted(all_files, key=lambda k: k[1])
现在,我希望为此编写一个单元测试,并尝试使用来自 botocore 的 stubber 对象来测试它。我试过类似的东西:
def test_unprocessed_s3_objects():
client = boto3.client("s3")
stubber = Stubber(client)
list_objects_v2_response = {
"Contents": [
{
"Key": "sensor_1",
"LastModified": "2021-11-30T12:58:14+00:00",
"ETag": '"3a3c5ca43d2f01dba42314c1ca7e2237"',
"Size": 1417,
"StorageClass": "STANDARD",
},
{
"Key": "sensor_2",
"LastModified": "2021-11-30T12:58:05+00:00",
"ETag": '"02bc99343bbdcbdefc9fe691b6f7deaa"',
"Size": 1332,
"StorageClass": "STANDARD",
},
]
}
stubber.add_response("list_objects_v2", list_objects_v2_response)
items = get_unprocessed_s3_object_list(client, "test")
assert len(items) == 2
但是,这会返回 Access Denied with ListObjectsV2 ooperation。也许这与存储桶 ID 有关。我认为这会绕过实际调用。
【问题讨论】:
-
加
stubber.activate()能用吗?你可能还想看看 moto:github.com/spulec/moto -
啊,是的!那确实奏效了。我还有其他错误,但这是排序的。谢谢。你介意把它写成答案,所以我可以接受!
标签: python unit-testing amazon-s3 boto3