【问题标题】:Stream large string to S3 using boto3使用 boto3 将大字符串流式传输到 S3
【发布时间】:2019-03-20 09:28:21
【问题描述】:

我正在从 S3 下载文件,转换其中的数据,然后创建一个新文件以上传到 S3。我正在下载的文件小于 2GB,但因为我正在增强数据,当我上传它时,它非常大(200gb+)。

目前你可以通过代码想象是这样的:

files = list_files_in_s3()
new_file = open('new_file','w')
for file in files:
    file_data = fetch_object_from_s3(file)
    str_out = ''
    for data in file_data:
        str_out += transform_data(data)
    new_file.write(str_out)
s3.upload_file('new_file', 'bucket', 'key')

这个问题是'new_file'有时太大而无法放入磁盘。正因为如此,我想使用 boto3 upload_fileobj 以流形式上传数据,这样我就根本不需要在磁盘上保存临时文件。

有人可以帮忙提供一个例子吗? Python 方法似乎与我熟悉的 Java 完全不同。

【问题讨论】:

    标签: python-3.x amazon-s3 boto3


    【解决方案1】:

    您可以在 read-function 中使用 amt 参数,记录在这里:https://botocore.amazonaws.com/v1/documentation/api/latest/reference/response.html

    然后使用此处记录的 MultiPartUpload 逐个上传文件: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#multipartupload

    https://docs.aws.amazon.com/AmazonS3/latest/dev/mpuoverview.html

    您应该有一个规则来删除不完整的分段上传:

    https://aws.amazon.com/es/blogs/aws/s3-lifecycle-management-update-support-for-multipart-uploads-and-delete-markers/

    否则您最终可能会为存储在 S3 中的不完整数据部分付费。

    我从自己的脚本中复制粘贴了一些内容来执行此操作。这显示了如何从下载到上传一直流式传输。如果您需要考虑内存限制。您也可以在上传之前将其更改为在本地存储文件。

    无论如何,您都必须使用 MultiPartUpload,因为 S3 对您可以在一次操作中上传的文件大小有限制:https://aws.amazon.com/s3/faqs/

    “单个 PUT 中可上传的最大对象为 5 GB。对于大于 100 MB 的对象,客户应考虑使用分段上传功能。”

    这是一个代码示例(我还没有测试过这里的代码):

    import boto3
    amt = 1024*1024*10 # 10 MB at the time
    session = boto3.Session(profile_name='yourprofile')
    s3res = session.resource('s3')
    source_s3file = "yourfile.file"
    target_s3file = "yourfile.file"
    source_s3obj = s3res.Object("your-bucket", source_s3file)
    target_s3obj = s3res.Object("your-bucket", target_s3file)
    
    # initiate MultiPartUpload
    mpu = target_s3obj.initiate_multipart_upload()
    partNr = 0
    parts = []
    
    body = source_s3obj.get()["Body"]   
    # get initial chunk
    chunk = body.read(amt=amt).decode("utf-8") # this is where you use the amt-parameter
    # Every time you call the read-function it reads the next chunk of data until its empty.
    # Then do something with the chunk and upload it to S3 using MultiPartUpload
    partNr += 1
    part = mpu.Part(partNr)
    response = part.upload(Body=chunk)
    parts.append({
        "PartNumber": partNr,
        "ETag": response["ETag"]
    })
    
    while len(chunk) > 0:
        # there is more data, get a new chunk
        chunk = body.read(amt=amt).decode("utf-8")
        # do something with the chunk, and upload the part
        partNr += 1
        part = mpu.Part(partNr)
        response = part.upload(Body=chunk)
        parts.append({
            "PartNumber": partNr,
            "ETag": response["ETag"]
        })
    # no more chunks, complete the upload
    part_info = {}
    part_info["Parts"] = parts
    mpu_result = mpu.complete(MultipartUpload=part_info)
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-09-23
      • 2020-10-02
      • 1970-01-01
      • 2017-07-11
      • 2021-07-06
      • 1970-01-01
      • 2019-09-28
      • 1970-01-01
      相关资源
      最近更新 更多