【问题标题】:Writing unit test for this S3 function为这个 S3 函数编写单元测试
【发布时间】:2022-01-11 11:44:29
【问题描述】:

我有一个 S3 函数,定义如下:

def get_unprocessed_s3_object_list(s3_client, bucket: str) -> List[str]:
    paginator = s3_client.get_paginator("list_objects_v2")
    page_iterator = paginator.paginate(Bucket=bucket)

    all_files = {}

    for page in page_iterator:
        if page["KeyCount"] > 0:
            for item in page["Contents"]:
                all_files[item["Key"]] = item["LastModified"]

    return sorted(all_files, key=lambda k: k[1])

现在,我希望为此编写一个单元测试,并尝试使用来自 botocore 的 stubber 对象来测试它。我试过类似的东西:

def test_unprocessed_s3_objects():
    client = boto3.client("s3")
    stubber = Stubber(client)

    list_objects_v2_response = {
        "Contents": [
            {
                "Key": "sensor_1",
                "LastModified": "2021-11-30T12:58:14+00:00",
                "ETag": '"3a3c5ca43d2f01dba42314c1ca7e2237"',
                "Size": 1417,
                "StorageClass": "STANDARD",
            },
            {
                "Key": "sensor_2",
                "LastModified": "2021-11-30T12:58:05+00:00",
                "ETag": '"02bc99343bbdcbdefc9fe691b6f7deaa"',
                "Size": 1332,
                "StorageClass": "STANDARD",
            },
        ]
    }

    stubber.add_response("list_objects_v2", list_objects_v2_response)
    items = get_unprocessed_s3_object_list(client, "test")
    assert len(items) == 2

但是,这会返回 Access Denied with ListObjectsV2 ooperation。也许这与存储桶 ID 有关。我认为这会绕过实际调用。

【问题讨论】:

  • stubber.activate()能用吗?你可能还想看看 moto:github.com/spulec/moto
  • 啊,是的!那确实奏效了。我还有其他错误,但这是排序的。谢谢。你介意把它写成答案,所以我可以接受!

标签: python unit-testing amazon-s3 boto3


【解决方案1】:

调用stubber.add_response("list_objects_v2", list_objects_v2_response)后,需要调用stubber.activate()才能使用存根。

【讨论】:

    猜你喜欢
    • 2022-12-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-04-14
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多