【问题标题】:Create new csv file in Google Cloud Storage from cloud function通过云功能在 Google Cloud Storage 中创建新的 csv 文件
【发布时间】:2021-10-01 00:09:33
【问题描述】:

第一次使用 Google Cloud Storage。下面我有一个云功能,只要 csv 文件上传到我的存储桶内的my-folder,就会触发该功能。我的目标是在同一个文件夹中创建一个新的 csv 文件,读取上传的 csv 的内容并将每一行转换为将进入新创建的 csv 的 URL。问题是我一开始就创建新的 csv 时遇到了麻烦,更不用说实际写入它了。

我的代码:

import os.path
import csv
import sys
import json
from csv import reader, DictReader, DictWriter
from google.cloud import storage
from io import StringIO

def generate_urls(data, context):
    if context.event_type == 'google.storage.object.finalize':
        storage_client = storage.Client()
        bucket_name = data['bucket']
        bucket = storage_client.get_bucket(bucket_name)
        folder_name = 'my-folder'
        file_name = data['name']

        if not file_name.endswith('.csv'):
            return

接下来的几行来自 GCP 的 GitHub 存储库中的 an example。这是我希望创建新 csv 的时候,但没有任何反应。

        # Prepend 'URL_' to the uploaded file name for the name of the new csv
        destination = bucket.blob(bucket_name + '/' + file_name[:14] + 'URL_' + file_name[14:])
        destination.content_type = 'text/csv'
        sources = [bucket.get_blob(file_name)]
        destination.compose(sources)
        output = bucket_name + '/' + file_name[:14] + 'URL_' + file_name[14:]


        # Transform uploaded csv to string - this was recommended on a similar SO post, not sure if this works or is the right approach...
        blob = bucket.blob(file_name)
        blob = blob.download_as_string()
        blob = blob.decode('utf-8')
        blob = StringIO(blob)

        input_csv = csv.reader(blob)

在下一行出现错误:No such file or directory: 'myProjectId/my-folder/URL_my_file.csv'

        with open(output, 'w') as output_csv:
            csv_dict_reader = csv.DictReader(input_csv, )
            csv_writer = csv.DictWriter(output_csv, fieldnames=['URL'], delimiter=',', quotechar='"', quoting=csv.QUOTE_ALL)
            csv_writer.writeheader()
            line_count = 0
            for row in csv_dict_reader:
                line_count += 1
                url = ''
                ...
                # code that converts each line
                ...
                csv_writer.writerow({'URL': url})
            print(f'Total rows: {line_count}')

如果有人对我如何获得它来创建新的 csv 然后写入它有任何建议,那将是一个巨大的帮助。谢谢!

【问题讨论】:

    标签: python csv google-cloud-platform google-cloud-functions google-cloud-storage


    【解决方案1】:

    可能我会说我对代码和解决方案的设计有几个问题:

    1. 据我了解 - 一方面,云功能是由 finalise 事件 Google Cloud Storage Triggers 触发的,另一方面,您不想将新创建的文件保存到同一个存储桶中。成功后,该存储桶中新对象的出现将触发您的云函数的另一个实例。这是预期的行为吗?你的云功能准备好了吗?

    2. 在本体上没有folder这样的东西。因此在这段代码中:

            folder_name = 'my-folder'
            file_name = data['name']
    

    第一行有点多余,除非您想将该变量和值用于其他用途...并且file_name 获取包含所有前缀的对象名称(您可以将它们视为“文件夹”。

    1. 您提到的示例 - storage_compose_file.py - 是关于如何将 GCS 中的几个对象组合成一个。我不确定该示例是否与您的案例相关,除非您有一些额外的要求。

    2. 现在,让我们看看这个sn-p:

            destination = bucket.blob(bucket_name + '/' + file_name[:14] + 'URL_' + file_name[14:])
            destination.content_type = 'text/csv'
            sources = [bucket.get_blob(file_name)]
            destination.compose(sources)
    

    一个。 bucket.blob - 是工厂构造函数 - 参见 API buckets description。我不确定您是否真的想使用 bucket_name 作为其参数的元素...

    b. sources - 变成一个只有一个元素的列表 - 对 GCS 存储桶中现有对象的引用。

    c。 destination.compose(sources) - 是否试图复制现有对象?如果成功 - 它可能会触发您的云函数的另一个实例。

    1. 关于类型更改
            blob = bucket.blob(file_name)
            blob = blob.download_as_string()
    

    在第一行之后,blob 变量的类型为 google.cloud.storage.blob.Blob。在第二个之后 - bytes。我认为 Python 允许这样的事情......但你真的喜欢它吗?顺便说一句,download_as_string 方法已被弃用 - 请参阅Blobs / Objects API

    1. 关于output
       output = bucket_name + '/' + file_name[:14] + 'URL_' + file_name[14:]
        
       with open(output, 'w') as output_csv:
    

    请记住 - 所有这些都发生在云函数的内存中。与 GCS 的 blob 桶无关。如果你想在云函数中使用临时文件 - 你将在 /tmp 目录中使用它们 - Write temporary files from Google Cloud Function 我猜你会因为这个问题而收到错误。

    => 提出一些建议。

    您可能希望将该对象下载到云函数内存中(到/tmp 目录中)。然后您想处理源文件并将结果保存在源附近。然后您想将结果上传到 另一个(不是源)存储桶。如果我的假设是正确的,我会建议逐步实施这些事情,并检查您是否在每一步都获得了预期的结果。

    【讨论】:

    • 感谢您的详细回复。关于第 2 点,我在一些条件检查中使用了folder_namefile_name,我没有在此处包含这些条件检查,以使问题更简洁。一种情况是避免处理包含“URL_”的新文件,这样函数就不会在循环中被触发。我现在可以看到我感到困惑的地方。我会尝试你的建议并在/tmp 中创建一个新文件并在那里写信。我认为有可能在我写完新文件后将其从/tmp 移动到my-folder
    • 它可能会起作用。注意云功能超时,最大分配内存(那些临时文件吃掉分配的内存),将对象写入GCS存储桶的IAM权限(运行云功能的服务帐户应具有相关权限。记住您付费时间,从而将结果文件保存到原始存储桶中 - 你浪费你的钱。
    • 所以我能够在/tmp 目录中创建新的 csv,但现在我在尝试将其移动到我的存储桶文件夹时遇到错误。在这里发布了一个新问题:stackoverflow.com/questions/69413379/…
    【解决方案2】:

    您可以通过两种方式在 Google Cloud Storage 中保存 csv。

    或者,您可以使用“requirements.txt”中的gcsfs 包将其直接保存到GCS,或者使用容器的/tmp 文件夹,然后从那里将其推送到GCS 存储桶。

    使用 Python 包“gcsfs”的强大功能

    gcsfs 代表“谷歌云存储文件系统”。添加

    gcsfs==2021.11.1
    

    或您的“requirements.txt”的另一个版本。您不要在代码中直接使用此包名称,而是它的安装只允许您直接保存到 Google Cloud Storage,不需要临时 /tmp 并推送到 GCS 存储桶目录。您也可以将文件存储在子目录中。

    您可以保存数据框,例如:

    df.to_csv('gs://MY_BUCKET_NAME/MY_OUTPUT.csv')
    

    或:

    df.to_csv('gs://MY_BUCKET_NAME/MY_DIR/MY_OUTPUT.csv')
    

    或在创建 CF 时使用第一个菜单步骤的环境变量:

    from os import environ
    
    df.to_csv(environ["CSV_OUTPUT_FILE_PATH"], index=False)
    

    不确定是否需要这样做,但我看到了一个 gcsfs 包与安装在一起的示例

    fsspec==2021.11.1 
    

    添加它不会有什么坏处。不过,我在 GCS 上将一个小 df 保存到 csv 的测试不需要这个包。由于我不确定这个帮助模块,引用:

    Purpose(fsspec):

    要为文件系统接口生成模板或规范, 应遵循特定的实现,以便应用程序 使用它们可以依赖于共同的行为,而不必 担心与任何具体的内部实施决策 给定后端。许多这样的实现都包含在这个包中, 或在 s3fs 和 gcsfs 等姊妹项目中。

    此外,如果这是精心设计的,那么附加功能, 例如文件系统的键值存储或 FUSE 挂载 实现可能对所有实现“免费”可用。

    首先在容器的“/tmp”中,然后推送到 GCS

    这是一个示例,说明如何执行 what the other answer says about storing it at first in the container's /tmp (and only there, no other dir is possible),然后将其移动到您选择的存储桶中。您也可以将其保存到同时存储云函数源代码的存储桶中,对照另一个答案的最后一句(已测试,有效):

    # function `write_to_csv_file()` not used but might be helpful if no df at hand:
    #def write_to_csv_file(file_path, file_content, root):
    #    """ Creates a file on runtime. """
    #    file_path = path.join(root, file_path)
    #
    #    # If file is a binary, we rather use 'wb' instead of 'w'
    #    with open(file_path, 'w') as file:
    #        file.write(file_content)    
    
    def push_to_gcs(file, bucket):
        """ Writes to Google Cloud Storage. """
        file_name = file.split('/')[-1]
        print(f"Pushing {file_name} to GCS...")
        blob = bucket.blob(file_name)
        blob.upload_from_filename(file)
        print(f"File pushed to {blob.id} succesfully.")        
    
    # Root path on CF will be /workspace, while on local Windows: C:\
    root = path.dirname(path.abspath(__file__))
    file_name = 'test_export.csv'
    # This is the main step: you *must* use `/tmp`:
    file_path = '/tmp/' + file_name
    
    d = {'col1': [1, 2], 'col2': [3, 4]}
    df = pd.DataFrame(data=d)
    df.to_csv(path.join(root, file_path), index = False)
    
    # If you have a df anyway, `df.to_csv()` is easier. 
    # The following file writer should rather be used if you have records instead (here: dfAsString). Since we do not use the function `write_to_csv_file()`, it is also commented out above, but can be useful if no df at hand.
    # dfAsString = df.to_string(header=True, index=False)   
    # write_to_csv_file(file_path, dfAsString, root)
    
    # Cloud Storage Client
    # Move csv file to Cloud Storage
    storage_client = storage.Client()
    bucket_name = MY_GOOGLE_STORAGE_BUCKET_NAME
    bucket = storage_client.get_bucket(bucket_name)
    push_to_gcs(path.join(root, file_path), bucket)
    

    【讨论】:

      猜你喜欢
      • 2021-06-16
      • 2019-09-27
      • 2018-01-03
      • 1970-01-01
      • 2016-08-19
      • 2013-04-30
      • 1970-01-01
      • 1970-01-01
      • 2015-08-28
      相关资源
      最近更新 更多