【问题标题】:Loading multiple file from cloud storage to big query in different tables将多个文件从云存储加载到不同表中的大查询
【发布时间】:2021-07-29 16:19:49
【问题描述】:

我是 GCP 的新手,我可以将 1 个文件从我的 VM 获取到 GCS,然后将其传输到 bigquery。 如何将多个文件从 GCS 传输到 Bigquery。我知道通配符 URi 是它的解决方案,但下面的代码还需要进行哪些其他更改?

def hello_gcs(event, context):
    from google.cloud import bigquery
    # Construct a BigQuery client object.
    client = bigquery.Client()

    # TODO(developer): Set table_id to the ID of the table to create.
    table_id = "test_project.test_dataset.test_Table"
  
    job_config = bigquery.LoadJobConfig(
        autodetect=True,
        skip_leading_rows=1,
        # The source format defaults to CSV, so the line below is optional.
        source_format=bigquery.SourceFormat.CSV,
    )
    uri = "gs://test_bucket/*.csv"

    load_job = client.load_table_from_uri(
        uri, table_id, job_config=job_config
    )  # Make an API request.

    load_job.result()  # Waits for the job to complete.

    destination_table = client.get_table(table_id)  # Make an API request.
    print(f"Processing file: {file['name']}.")

由于可能有多个上传,所以我无法定义特定的表名或文件名?是否可以自动执行此任务?

每当 GCS 存储桶中有新文件时,PubSub 都会触发此功能。 谢谢

【问题讨论】:

    标签: python-3.x google-bigquery google-cloud-functions google-cloud-storage google-cloud-pubsub


    【解决方案1】:

    要在单个 Cloud Function 调用中将多个 GCS 文件加载到多个 BQ 表中,您需要列出这些文件,然后对其进行迭代,为每个文件创建一个加载作业,就像您为一个文件所做的那样。但是在单个函数调用中完成所有这些工作,有点破坏了使用 Cloud Functions 的目的。

    如果您的要求不强制您这样做,您可以利用 Cloud Functions 的强大功能,让每个文件在添加到存储桶后触发单个 CF,因为它是一个事件驱动的函数。请参考https://cloud.google.com/functions/docs/writing/background#cloud-storage-example。每次有指定的活动时都会触发它,其中会有事件元数据。

    因此,在您的应用程序中,我们可以获取触发事件的文件的名称,而不是获取 URI 中的整个存储桶内容,并将该文件仅加载到 bigquery 表中,如下面的代码示例所示。

    您可以通过以下方式解决代码中的问题。在您的代码中尝试以下更改。

    • 您可以从云函数事件字典中提取有关事件的详细信息和触发事件的文件的详细信息。在您的情况下,我们可以将文件名作为 event['name'] 并更新“uri”变量。

    • 生成一个新的唯一table_id(这里以table_id 为例,与文件名相同)。您可以根据需要使用其他方案生成唯一的文件名。

    参考下面的代码

     def hello_gcs(event, context):
       from google.cloud import bigquery
    
       client = bigquery.Client() # Construct a BigQuery client object.
    
       print(f"Processing file: {event['name']}.") #name of the file which triggers the function
     
       if ".csv" in event['name']:
          # bq job config
           job_config = bigquery.LoadJobConfig(
           autodetect=True,
           skip_leading_rows=1,
           source_format=bigquery.SourceFormat.CSV,
         )
    
       file_name = event['name'].split('.')  
       table_id = "<project_id>.<dataset_name>."+file_name[0] #[generating new id for each table]
    
       uri = "gs://<bucket_name>/"+event['name']
       load_job = client.load_table_from_uri(
           uri, table_id, job_config=job_config
       )  # Make an API request.
       load_job.result()  # Waits for the job to complete.
       destination_table = client.get_table(table_id)  # Make an API request.
       print("Table {} uploaded.".format(table_id))
    

    【讨论】:

      【解决方案2】:

      要将多个文件从 GCS 传输到 Bigquery,您只需遍历所有文件即可。下面是带有 cmets 的工作代码示例。 我相信eventcontext(函数参数)默认由谷歌云函数处理,所以不需要修改那部分。或者,您可以利用 event 而不是循环来简化代码。

      def hello_gcs(event, context):
          import re
          from google.cloud import storage
          from google.cloud import bigquery
          from google.cloud.exceptions import NotFound
      
          bq_client = bigquery.Client()
          bucket = storage.Client().bucket("bucket-name")
          for blob in bucket.list_blobs(prefix="folder-name/"):
              if ".csv" in blob.name: #Checking for csv blobs as list_blobs also returns folder_name
                 job_config = bigquery.LoadJobConfig(
                     autodetect=True,
                     skip_leading_rows=1,
                     source_format=bigquery.SourceFormat.CSV,
                 )
                 csv_filename = re.findall(r".*/(.*).csv",blob.name) #Extracting file name for BQ's table id
                 bq_table_id = "project-name.dataset-name."+csv_filename[0] # Determining table name
             
                 try: #Check if the table already exists and skip uploading it.
                     bq_client.get_table(bq_table_id)
                     print("Table {} already exists. Not uploaded.".format(bq_table_id))
                 except NotFound: #If table is not found, upload it.    
                     uri = "gs://bucket-name/"+blob.name
                     print(uri)
                     load_job = bq_client.load_table_from_uri(
                         uri, bq_table_id, job_config=job_config
                     )  # Make an API request.
                     load_job.result()  # Waits for the job to complete.
                     destination_table = bq_client.get_table(bq_table_id)  # Make an API request.
                     print("Table {} uploaded.".format(bq_table_id))
      
      

      【讨论】:

      • 我可以看到这被标记为答案。任何人都可以在这里帮助我,因为当我运行此代码时出现以下错误:函数(数据,上下文)文件“/workspace/main.py”,行8、在 csv_loader table_id=client.get_table(os.environ["TABLE"]) 文件 "/opt/python3.9/lib/python3.9/os.py", 第 679 行, in getitem 从无 KeyError 提高 KeyError(key): 'TABLE'
      【解决方案3】:

      如果我错了,请纠正我,我了解您的云功能是由 finalize 事件 (Google Cloud Storage Triggers) 触发的,当存储桶中出现新文件(或对象)时。这意味着桶中的每个“新”对象都有一个事件。因此,每个对象至少调用一次云函数。

      上面的链接有一个来自event 字典的数据示例。那里有大量信息,包括要加载的对象(文件)的详细信息。

      例如,您可能希望在文件名模式和目标 BigQuery 表之间进行一些映射以进行数据加载。使用该地图,您将能够决定应使用哪个表进行加载。或者您可能有一些其他机制来选择目标表。

      其他一些需要考虑的事情:

      1. 异常处理 - 如果 数据未加载(出于任何原因)?通知谁以及如何通知? 要做什么(更正源数据或目标表 并且)重复加载等。
      2. 如果加载时间比云函数长,会发生什么情况 超时(目前最多 540 秒)?
      3. 如果有多个云功能会发生什么 来自一个 finalize 事件的调用,或者来自不同事件的调用,但 从语义上相同的源文件(重复数据,重复, 等)

      不要回答我,如果你还没做过,就想想这种情况。

      【讨论】:

        【解决方案4】:

        如果您的数据源是 GCS,而您的目标是 BQ,您可以使用 BigQuery Data Transfer Service 在 BQ 中对您的数据进行 ETL。每个 Transfer 作业都是针对某个 Table 的,您可以选择是否要使用 Streaming 模式附加或覆盖某个 Table 中的数据。

        您也可以安排此作业。每日、每周等。

        【讨论】:

        • 由于一些限制,我想用云函数和python编程来做。
        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2012-11-11
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多