【问题标题】:Merge more than 32 files in Google Cloud Storage在 Google Cloud Storage 中合并超过 32 个文件
【发布时间】:2014-10-03 12:21:42
【问题描述】:

我在 Google Compute Engine 上运行了一个 Apache Spark 脚本,该脚本用于输出 Google Cloud Storage。我的 Cloud Storage 文件夹中有 300 多个 part-00XXX 文件。我想合并它们。

我试过了:

poiuytrez@spark-m:~$ gsutil compose gs://mybucket/data/* gs://mybucket/myfile.csv

但是我收到了这个错误:

CommandException: "compose" called with too many component objects. Limit is 32.

对合并所有这些零件文件的解决方案有什么想法吗?

【问题讨论】:

  • 将您的文件分成包含 32 个文件的块。分别合并每个。假设您从 N 个文件开始,您现在将拥有 N/32 个文件。重复。如果您有足够的内存,您可以使用子命令行来完成,并且不必每次都读取/写入磁盘

标签: google-cloud-storage apache-spark google-compute-engine


【解决方案1】:

您只能在一个请求中组合 32 个对象,但一个组合对象最多可以包含 1024 个组件。特别是,您可以将对象 0-31 组合成某个对象 0',将 32-63 组合成 1' 等 - 然后每个组合对象都可以通过组合 (0', 1',..., floor (300/32)')。

【讨论】:

  • 如果有人写脚本来做,请随时告诉我。
【解决方案2】:

只要组合对象的大小达到n MB 限制,此代码就会从一条路径组合对象到composed/

import os
from uuid import uuid4
from itertools import count

from google.cloud import storage


def delete_directory(bucket, prefix):
    print(f"cleaning temporary blobs in {prefix}")
    for blob in bucket.list_blobs(prefix=prefix):
        blob.delete()


def _run_compose(bucket, blob_name, blobs_to_compose):
    composed_blob = bucket.blob(blob_name)
    composed_blob.compose(blobs_to_compose, timeout=600)
    return composed_blob


def create_composed_blob_name(dest_prefix, offset=0):
    return f"{dest_prefix}/composed-{offset}-{uuid4().hex[:4]}.jsonl"


def compose(
    client,
    bucket_name,
    source_prefix,
    dest_prefix,
    create_new_line_blob,
    merge_when_size_mb=250,
    print_every_n_blobs=100,
):
    """

    Args:
        client:
        bucket_name:
        source_prefix:
        dest_prefix:
        create_new_line_blob:
        merge_when_size: compose when you hit `n` MB.
        merge_when_size_mb:
        print_every_n_blobs:

    Returns:

    """

    merge_when_size = merge_when_size_mb * 1024 * 1024  # MB

    bucket = client.bucket(bucket_name)

    if create_new_line_blob:
        new_line_blob = bucket.blob("tmp/new_line.txt")
        new_line_blob.upload_from_string("\n")

    blobs_to_compose = []
    composed_blob_offset = count(0)
    running_size = 0
    i = 0

    f = open("/tmp/all_composed_blobs.txt", "w")

    for i, blob in enumerate(bucket.list_blobs(prefix=source_prefix), 1):
        f.write(f"{bucket_name}/{blob.name}\n")

        if i % print_every_n_blobs == 0:
            print(f"{i} blob processed.")

        blobs_to_compose.append(blob)
        running_size += blob.size

        if len(blobs_to_compose) == 31:
            blob_name = create_composed_blob_name(
                "composed/tmp", next(composed_blob_offset)
            )
            composed_blob = _run_compose(bucket, blob_name, blobs_to_compose)
            blobs_to_compose = [composed_blob]

            # refresh all counters

        if create_new_line_blob:
            blobs_to_compose.append(new_line_blob)

        if running_size >= merge_when_size:
            blob_name = create_composed_blob_name(
                dest_prefix, next(composed_blob_offset)
            )
            _run_compose(bucket, blob_name, blobs_to_compose)
            # refresh all counters
            blobs_to_compose = []
            running_size = 0

    print(f"Last processed blob is {i}.")

    # compose the remaining, if any.
    if len(blobs_to_compose) != 0:
        blob_name = create_composed_blob_name(dest_prefix, next(composed_blob_offset))
        _run_compose(bucket, blob_name, blobs_to_compose)

    # final operations -- uploading the list of blob names & delete temporary directory.
    delete_directory(bucket, prefix="composed/tmp")
    f.close()
    bucket.blob(
        f"composed/composed-files/{os.path.basename(f.name)}"
    ).upload_from_filename(f.name)


def run():
    client = storage.Client()
    bucket_name = "some_bucket"
    source_prefixes = ["some_prefix", "another_prefix"]
    for source_prefix in source_prefixes:
        compose(
            client,
            bucket_name,
            source_prefix,
            f"composed/{source_prefix}",
            create_new_line_blob=True,
            merge_when_size_mb=250,
        )


if __name__ == "__main__":
    run()

【讨论】:

    猜你喜欢
    • 2017-04-29
    • 2015-08-01
    • 2016-08-19
    • 2021-06-16
    • 2015-08-28
    • 2014-03-02
    • 1970-01-01
    • 1970-01-01
    • 2021-01-22
    相关资源
    最近更新 更多