【问题标题】:Stream huge zip files on S3 using Lambda and boto3使用 Lambda 和 boto3 在 S3 上流式传输巨大的 zip 文件
【发布时间】:2021-09-23 09:36:00
【问题描述】:

我在 S3 上将一堆 CSV 文件压缩为一个 zip。我只需要使用 AWS lambda 函数处理 zip 中的一个 CSV 文件

import boto3
from zipfile import ZipFile

BUCKET = 'my-bucket'
s3_rsc = boto3.resource('s3')

def zip_stream(zip_f='app.zip', bkt=BUCKET, rsc=s3_rsc):
    obj = rsc.Object(
        bucket_name=bkt,
        key=zip_f
    )

    return ZipFile(BytesIO(obj.get()['Body'].read()))


zip_obj = zip_stream()
csv_dat = zip_obj.read('one.csv')

上面的 sn-p 可以很好地处理测试 zip 文件,但是如果 zip 文件大小超过 0.5G,它会失败并出现 内存错误

错误信息

{ "errorMessage": "", "errorType": "MemoryError", "stackTrace": [ " 文件 "/var/task/lambda_function.py", 第 12 行, 在处理程序中\n all_files = files_in_zip()\n", " 文件 "/var/task/lambda_function.py", 第 36 行, 在 files_in_zip\n zippo = zip_stream()\n", " File "/var/task/lambda_function.py", line 32, in zip_stream\n return ZipFile(BytesIO(obj.get()['Body'].read()))\n", " 文件 "/var/runtime/botocore/response.py",第 77 行,在 read\n chunk = self._raw_stream.read(amt)\n", " File "/var/runtime/urllib3/response.py", line 515, in read\n data = self._fp.read() if not fp_closed else b""\n", " 文件 "/var/lang/lib/python3.8/http/client.py", 第 468 行, 在 read\n s = self._safe_read(self.length)\n", " 文件 "/var/lang/lib/python3.8/http/client.py",第 609 行,在 _safe_read\n data = self.fp.read(amt)\n" ] }

是否有stream/lazyload zipfile 的选项来缓解内存问题?

注意 - 我还提到了一篇旧帖子 (How can I use boto to stream a file out of Amazon S3 to Rackspace Cloudfiles?),它谈到了流式传输文件而不是 zip

【问题讨论】:

  • 还可以考虑简单地为 Lambda 函数配置更多 RAM。
  • 您可以考虑使用smart-open 根据需要包装来自 S3 的流式数据工作。
  • @AnonCoward smart-open 似乎没有包装/流式传输 zip 格式的文件,我试图包装 io.BufferedReader(response['Body']) 但未能成功。如果 smart-open 可以处理 zip 格式的文件,你能举个例子吗
  • @jarmod 您可以将 Lambda 函数的 RAM 最多增加到 10G,这是一种解决方法,但是,这对我来说似乎是一个昂贵的举动。
  • 不一定贵很多。使用更多 RAM,您将获得相应的更多 CPU 和网络 i/o,因此您的进程可能会运行得更快,因此您将每毫秒支付更多费用,但总持续时间更短。也许尝试aws-lambda-power-tuning 以获得最佳组合。

标签: python amazon-web-services amazon-s3


【解决方案1】:

根据您的具体需求,您可以使用smart-open 来处理 zip 文件的读取。如果您可以将 CSV 数据放入 Lambda 的 RAM 中,则直接调用相当简单:

from smart_open import smart_open
from io import TextIOWrapper, BytesIO

def lambda_handler(event, context):
    # Simple test, just calculate the sum of the first column of a CSV file in a Zip file
    total_sum, row_count = 0, 0
    # Use smart open to handle the byte range requests for us
    with smart_open("s3://example-bucket/many_csvs.zip", "rb") as f:
        # Wrap that in a zip file handler
        zip = zipfile.ZipFile(f)
        # Open a specific CSV file in the zip file
        zf = zip.open("data_101.csv")
        # Read all of the data into memory, and prepare a text IO wrapper to read it row by row
        text = TextIOWrapper(BytesIO(zf.read()))
        # And finally, use python's csv library to parse the csv format
        cr = csv.reader(text)
        # Skip the header row
        next(cr)
        # Just loop through each row and add the first column
        for row in cr:
            total_sum += int(row[0])
            row_count += 1

    # And output the results
    print(f"Sum {row_count} rows for col 0: {total_sum}")

我使用包含数百个 CSV 文件的 1gb zip 文件对此进行了测试。我选择的 CSV 文件在未压缩时约为 12mb,即 100,000 行,因此在 Lambda 环境中的 RAM 中感觉很好,即使 RAM 限制为 128mb。

如果您的 CSV 文件无法像这样一次加载,您需要注意分段加载,缓冲读取,以免浪费时间逐行读取并强制智能-打开一次加载小块。

【讨论】:

    猜你喜欢
    • 2017-05-11
    • 2019-03-20
    • 1970-01-01
    • 1970-01-01
    • 2023-03-19
    • 1970-01-01
    • 2020-05-14
    • 2015-01-05
    相关资源
    最近更新 更多