【问题标题】:Write a Pandas DataFrame to Google Cloud Storage or BigQuery将 Pandas DataFrame 写入 Google Cloud Storage 或 BigQuery
【发布时间】:2016-07-18 19:24:33
【问题描述】:

您好,感谢您的时间和考虑。 我正在 Google Cloud Platform / Datalab 中开发 Jupyter Notebook。 我创建了一个 Pandas DataFrame,并希望将此 DataFrame 写入 Google Cloud Storage(GCS) 和/或 BigQuery。我在 GCS 中有一个存储桶,并通过以下代码创建了以下对象:

import gcp
import gcp.storage as storage
project = gcp.Context.default().project_id    
bucket_name = 'steve-temp'           
bucket_path  = bucket_name   
bucket = storage.Bucket(bucket_path)
bucket.exists()  

我根据 Google Datalab 文档尝试了各种方法,但仍然失败。 谢谢

【问题讨论】:

  • 我开发了一个 python 包,专门用于将数据从一个位置(例如 pandas.DataFrame)传输到另一个位置(例如 BigQuery 或 Storage):google-pandas-load.readthedocs.io/en/latest/。此外,它具有 100% 的测试覆盖率。

标签: python google-cloud-storage google-cloud-platform google-cloud-datalab


【解决方案1】:

我认为您需要将其加载到纯字节变量中并在单独的单元格中使用 %%storage write --variable $sample_bucketpath(see the doc)...我仍在弄清楚...但是这与读取 CSV 文件所需的操作大致相反,我不知道它是否对写入有影响,但我必须使用 BytesIO 读取由 %% storage read 命令创建的缓冲区。 . 希望有帮助,告诉我!

【讨论】:

    【解决方案2】:

    尝试以下工作示例:

    from datalab.context import Context
    import google.datalab.storage as storage
    import google.datalab.bigquery as bq
    import pandas as pd
    
    # Dataframe to write
    simple_dataframe = pd.DataFrame(data=[{1,2,3},{4,5,6}],columns=['a','b','c'])
    
    sample_bucket_name = Context.default().project_id + '-datalab-example'
    sample_bucket_path = 'gs://' + sample_bucket_name
    sample_bucket_object = sample_bucket_path + '/Hello.txt'
    bigquery_dataset_name = 'TestDataSet'
    bigquery_table_name = 'TestTable'
    
    # Define storage bucket
    sample_bucket = storage.Bucket(sample_bucket_name)
    
    # Create storage bucket if it does not exist
    if not sample_bucket.exists():
        sample_bucket.create()
    
    # Define BigQuery dataset and table
    dataset = bq.Dataset(bigquery_dataset_name)
    table = bq.Table(bigquery_dataset_name + '.' + bigquery_table_name)
    
    # Create BigQuery dataset
    if not dataset.exists():
        dataset.create()
    
    # Create or overwrite the existing table if it exists
    table_schema = bq.Schema.from_data(simple_dataframe)
    table.create(schema = table_schema, overwrite = True)
    
    # Write the DataFrame to GCS (Google Cloud Storage)
    %storage write --variable simple_dataframe --object $sample_bucket_object
    
    # Write the DataFrame to a BigQuery table
    table.insert(simple_dataframe)
    

    我使用了this 示例,以及来自datalab github site_table.py 文件作为参考。您可以在this 链接找到其他datalab 源代码文件。

    【讨论】:

    • 请注意:我相信您需要在 Python 代码之外的单独单元格中执行 %%storage 命令?
    • 这取决于您是要执行行魔术还是单元魔术命令。对于单元魔法,它是 %%storage,对于线魔法,它是 %storage。可以在与其他代码相同的单元格中使用行魔术命令。单元格魔术命令必须与其他代码位于单独的单元格中
    • 感谢您的澄清
    • 非常感谢 Anthonios... 我能够成功创建所有对象(例如,表和架构在我的 BQ 项目/数据集中)。但是,实际上没有向表中写入任何行,也没有生成错误消息。
    • 在 Jupyter Notebook 中在 table.Insert_data(out) 之后生成了一个填充表,此行位于该表的底部:(rows: 0, edw-p19090000:ClickADS2.ADS_Logit1)跨度>
    【解决方案3】:

    使用谷歌Cloud Datalab documentation

    import datalab.storage as gcs
    gcs.Bucket('bucket-name').item('to/data.csv').write_to(simple_dataframe.to_csv(),'text/csv')
    

    【讨论】:

      【解决方案4】:

      将 Pandas DataFrame 写入 BigQuery

      ​​>

      更新 @Anthonios Partheniou 的回答。
      现在的代码有点不同 - 截至 11 月 。 29 2017

      定义 BigQuery 数据集

      将包含project_iddataset_id 的元组传递给bq.Dataset

      # define a BigQuery dataset    
      bigquery_dataset_name = ('project_id', 'dataset_id')
      dataset = bq.Dataset(name = bigquery_dataset_name)
      

      定义 BigQuery 表

      将包含project_iddataset_id 和表名的元组传递给bq.Table

      # define a BigQuery table    
      bigquery_table_name = ('project_id', 'dataset_id', 'table_name')
      table = bq.Table(bigquery_table_name)
      

      创建数据集/表并写入BQ中的表

      # Create BigQuery dataset
      if not dataset.exists():
          dataset.create()
      
      # Create or overwrite the existing table if it exists
      table_schema = bq.Schema.from_data(dataFrame_name)
      table.create(schema = table_schema, overwrite = True)
      
      # Write the DataFrame to a BigQuery table
      table.insert(dataFrame_name)
      

      【讨论】:

      • 1.11.2 上的 exists() 函数对我来说在 python 中对于 google-cloud-bigquery 不存在
      【解决方案5】:

      对于使用 Dask 的任务,我有一个更简单的解决方案。您可以将您的 DataFrame 转换为 Dask DataFrame,可以将其写入 Cloud Storage 上的 csv

      import dask.dataframe as dd
      import pandas
      df # your Pandas DataFrame
      ddf = dd.from_pandas(df,npartitions=1, sort=True)
      dd.to_csv('gs://YOUR_BUCKET/ddf-*.csv', index=False, sep=',', header=False,  
                                     storage_options={'token': gcs.session.credentials})  
      

      【讨论】:

        【解决方案6】:

        自 2017 年以来,Pandas 有一个 Dataframe to BigQuery 函数pandas.DataFrame.to_gbq

        documentation 有一个例子:

        import pandas_gbq as gbq gbq.to_gbq(df, 'my_dataset.my_table', projectid, if_exists='fail')

        参数if_exists可以设置为'fail'、'replace'或'append'

        另见example

        【讨论】:

          【解决方案7】:

          上传到谷歌云存储而不写入临时文件,只使用标准 GCS 模块

          from google.cloud import storage
          import os
          import pandas as pd
          
          # Only need this if you're running this code locally.
          os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = r'/your_GCP_creds/credentials.json'
          
          df = pd.DataFrame(data=[{1,2,3},{4,5,6}],columns=['a','b','c'])
          
          client = storage.Client()
          bucket = client.get_bucket('my-bucket-name')
              
          bucket.blob('upload_test/test.csv').upload_from_string(df.to_csv(), 'text/csv')
          

          【讨论】:

          • 非常感谢这个不使用其他模块和现有存储桶。
          • 如果您只想将文件推送到 GCS 上的存储桶,那么这是一个更合适的解决方案。如果您想推出 json 格式,这也可以使用:bucket.blob('upload_test/test.json').upload_from_string(df.to_json(), 'text/json')
          • 如果您不希望索引作为文件中的列,请使用 df.to_csv(index=False)
          【解决方案8】:

          我花了很多时间寻找最简单的方法来解决这个问题:

          import pandas as pd
          
          df = pd.DataFrame(...)
          
          df.to_csv('gs://bucket/path')
          

          【讨论】:

          • 这非常简单。只需确保还安装gcsfs 作为先决条件(尽管它会提醒您)。如果您在 2020 年或更晚时间来到这里,只需跳过复杂性并执行此操作。
          【解决方案9】:

          Google storage

          def write_df_to_gs(df, gs_key):
              df.to_csv(gs_key)    
          

          BigQuery

          def upload_df_to_bq(df, project, bq_table):
              df.to_gbq(bq_table, project_id=project, if_exists='replace')
          

          【讨论】:

            【解决方案10】:

            在 GCS 中保存 parquet 文件并使用服务帐户进行身份验证:

            df.to_parquet("gs://<bucket-name>/file.parquet",
                           storage_options={"token": <path-to-gcs-service-account-file>}
            

            【讨论】:

              猜你喜欢
              • 2020-11-09
              • 1970-01-01
              • 1970-01-01
              • 2023-04-08
              • 2019-02-14
              • 1970-01-01
              • 2023-03-13
              • 1970-01-01
              • 2017-11-13
              相关资源
              最近更新 更多