【问题标题】:Copy from Google Cloud Storage Bucket to S3 Bucket从 Google Cloud Storage 存储桶复制到 S3 存储桶
【发布时间】:2018-06-18 13:02:46
【问题描述】:

我设置了一个气流工作流程,将一些文件从 s3 摄取到 Google Cloud 存储,然后运行 ​​sql 查询工作流程以在 Big Query 上创建新表。在工作流结束时,我需要将最后一个 Big Query 表的输出推送到 Google Cloud Storage,然后从那里推送到 S3。

我已经使用BigQueryToCloudStorageOperator python 运算符破解了大查询表到谷歌云存储的传输,没有任何问题。但是,从 Google Cloud Storage 到 S3 的转移似乎是一条不太受欢迎的路线,我一直无法找到可以在我的 Airflow 工作流程中自动化的解决方案。

我知道rsyncgsutil 的一部分,并且已经开始工作(参见帖子Exporting data from Google Cloud Storage to Amazon S3),但我无法将其添加到我的工作流程中。

我有一个在计算引擎实例上运行的 dockerised 气流容器。

非常感谢帮助解决这个问题。

非常感谢!

【问题讨论】:

    标签: python amazon-s3 google-cloud-platform google-cloud-storage airflow


    【解决方案1】:

    所以我们也使用rsync 在 S3 和 GCS 之间移动数据,

    您首先需要让 bash 脚本正常工作,例如 gsutil -m rsync -d -r gs://bucket/key s3://bucket/key

    对于 s3,您还需要提供 AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEY 作为环境变量。

    然后定义您的 BashOperator 并将其放入您的 DAG 文件中

    rsync_yesterday = BashOperator(task_id='rsync_task_' + table,
                                    bash_command='Your rsync script',
                                    dag=dag)
    

    【讨论】:

    • 感谢承志 - 您如何定义计算引擎上的 S3 信用?在我的 Mac 本地,我已将它们添加到 .boto 配置文件中,但我无法在我的计算引擎实例上找到它的等价物。
    • 你可以尝试 ssh 并在计算引擎上添加 .boto 配置文件吗?
    • 我使用 DataProc 启动了一个 Hadoop 集群,并使用 distcp 完成了传输。一旦气流DataProcHadoopOperator 工作,我将更新答案。
    • 您还可以在 bash 命令中使用导出命令作为凭证。 bash_command='导出 AWS_ACCESS_KEY_ID="your_key";导出 AWS_SECRET_ACCESS_KEY="your_secret"; gsutil -m rsync -d -r gs://bucket/key s3://bucket/key'
    【解决方案2】:

    Google 建议使用 transfer service 在云平台之间进行传输。您可以使用他们的 python API 以编程方式设置传输。通过这种方式,数据直接在 S3 和谷歌云存储之间传输。使用gsutilrsync 的缺点是数据必须通过执行rsync 命令的机器/实例。这可能是一个瓶颈。

    Google Cloud Storage Transfer Service Doc

    【讨论】:

    【解决方案3】:

    我需要使用 AWS Lambda 将对象从 GC 存储桶复制到 S3。

    Python boto3 库允许从 GC 存储桶中列出和下载对象。

    以下是将“sample-data-s3.csv”对象从 GC 存储桶复制到 s3 存储桶的示例 lambda 代码。

    import boto3
    import io
    
    s3 = boto3.resource('s3')
    
    google_access_key_id="GOOG1EIxxMYKEYxxMQ"
    google_access_key_secret="QifDxxMYSECRETKEYxxVU1oad1b"
    
    gc_bucket_name="my_gc_bucket"
    
    
    def get_gcs_objects(google_access_key_id, google_access_key_secret,
                         gc_bucket_name):
        """Gets GCS objects using boto3 SDK"""
        client = boto3.client("s3", region_name="auto",
                              endpoint_url="https://storage.googleapis.com",
                              aws_access_key_id=google_access_key_id,
                              aws_secret_access_key=google_access_key_secret)
    
        # Call GCS to list objects in gc_bucket_name
        response = client.list_objects(Bucket=gc_bucket_name)
    
        # Print object names
        print("Objects:")
        for blob in response["Contents"]:
            print(blob)    
    
        object = s3.Object('my_aws_s3_bucket', 'sample-data-s3.csv')
        f = io.BytesIO()
        client.download_fileobj(gc_bucket_name,"sample-data.csv",f)
        object.put(Body=f.getvalue())
    
    def lambda_handler(event, context):
        get_gcs_objects(google_access_key_id,google_access_key_secret,gc_bucket_name) 
    

    您可以循环通过blob从GC桶中下载所有对象。

    希望这对想要使用 AWS lambda 将对象从 GC 存储桶传输到 s3 存储桶的人有所帮助。

    【讨论】:

      【解决方案4】:

      最简单的整体选项是gsutil rsync,但是在某些情况下 rsync 可能会占用太多资源或不够快。

      结合其他选择:

      【讨论】:

        猜你喜欢
        • 2017-11-29
        • 1970-01-01
        • 2017-07-06
        • 2022-01-07
        • 2017-11-23
        • 1970-01-01
        • 2019-08-07
        • 1970-01-01
        • 2020-08-07
        相关资源
        最近更新 更多