【问题标题】:Bigquery - Insert new data row into table by pythonBigquery - 通过python将新数据行插入表中
【发布时间】:2016-08-08 23:30:33
【问题描述】:

我阅读了很多关于google bigquery-python的文档,但我不明白如何通过python代码管理bigquery数据。

首先,我创建了一个新表,如下所示。

credentials = GoogleCredentials.get_application_default()
service = build('bigquery', 'v2', credentials = credentials)

project_id = 'my_project'
dataset_id = 'my_dataset'
table_id = 'my_table'

project_ref = {'projectId': project_id}
dataset_ref = {'datasetId': dataset_id,
               'projectId': project_id}
table_ref = {'tableId': table_id,
             'datasetId': dataset_id,
             'projectId': project_id}

dataset = {'datasetReference': dataset_ref}
table = {'tableReference': table_ref}
table['schema'] = {'fields': [
    {'name': 'id', 'type': 'string'},
...
]}

table = service.tables().insert(body = table, **dataset_ref).execute()

然后我想在这个表中插入一个数据,所以我试着像下面那样做。

fetch_list = []
patch = {'key': 'value'}
fetch_list.append(patch)

table = service.tables().patch(body = fetch_list, **table_ref).execute()

但什么也没发生。

如何将新数据更新到 bigquery 表中?

请给我一些示例代码。

【问题讨论】:

    标签: python google-bigquery


    【解决方案1】:

    2018 年 11 月编辑:

    这个问题的答案已经过时了,因为自上一篇文章以来谷歌云客户端已经有了很大的发展。

    官方文档已经包含了所有需要的信息; here 您可以找到流式插入所需的一切,this one 对目前可用的所有方法进行了完整概述(您还可以在每个页面和每个方法上找到 Python 代码示例)。

    原答案:

    您可以使用几种不同的方式将数据插入 BQ。

    为了更深入地了解 python-api 的工作原理,这里有您需要的一切:bq-python-api(起初文档有点吓人,但在您掌握它之后就相当简单了)。

    我使用两种主要方法将数据插入 BQ。第一个是data streaming,它应该在您可以实时逐行插入时使用。代码示例:

    import uuid
    def stream_data(self, table, data, schema):
        # first checks if table already exists. If it doesn't, then create it
        r = self.service.tables().list(projectId=your_project_id,
                                         datasetId=your_dataset_id).execute()
        table_exists = [row['tableReference']['tableId'] for row in
                        r['tables'] if
                        row['tableReference']['tableId'] == table]
        if not table_exists:
            body = {
                'tableReference': {
                    'tableId': table,
                    'projectId': your_project_id,
                    'datasetId': your_dataset_id
                },
                'schema': schema
            }
            self.service.tables().insert(projectId=your_project_id,
                                         datasetId=your_dataset_id,
                                         body=body).execute()
    
        # with table created, now we can stream the data
        # to do so we'll use the tabledata().insertall() function.
        body = {
            'rows': [
                {
                    'json': data,
                    'insertId': str(uuid.uuid4())
                }
            ]
        }
        self.service.tabledata().insertAll(projectId=your_project_id),
                                           datasetId=your_dataset_id,
                                           tableId=table,
                                             body=body).execute(num_retries=5)
    

    这里我的self.service 对应于您的service 对象。

    我们项目中输入data 的示例:

    data = {u'days_validated': '20', u'days_trained': '80', u'navigated_score': '1', u'description': 'First trial of top seller alg. No filter nor any condition is applied. Skus not present in train count as rank=0.5', u'init_cv_date': '2016-03-06', u'metric': 'rank', u'unix_date': '1461610020241117', u'purchased_score': '10', u'result': '0.32677139316724546', u'date': '2016-04-25', u'carted_score': '3', u'end_cv_date': '2016-03-25'}
    

    及其通讯员schema

    schema = {u'fields': [{u'type': u'STRING', u'name': u'date', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'unix_date', u'mode': u'NULLABLE'}, {u'type': u'STRING', u'name': u'init_cv_date', u'mode': u'NULLABLE'}, {u'type': u'STRING', u'name': u'end_cv_date', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'days_trained', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'days_validated', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'navigated_score', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'carted_score', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'purchased_score', u'mode': u'NULLABLE'}, {u'type': u'STRING', u'name': u'description', u'mode': u'NULLABLE'}, {u'type': u'STRING', u'name': u'metric', u'mode': u'NULLABLE'}, {u'type': u'FLOAT', u'name': u'result', u'mode': u'NULLABLE'}]}
    

    另一种插入数据的方法是使用job insert 函数。正如您在文档中看到的,它接受多个数据源。我有一个示例,说明如何通过将查询结果加载到另一个表中来做到这一点:

    def create_table_from_query(self,
                                query,
                                dest_table,
                                how):
        body = {
            'configuration': {
                'query': {
                    'destinationTable': {
                        'projectId': your_project_id,
                        'tableId': dest_table,
                        'datasetId': your_dataset_id
                    },
                    'writeDisposition': how,
                    'query': query,
                },
            }
        }
    
        response = self.connector.jobs().insert(projectId=self._project_id,
                                                body=body).execute()
        self.wait_job_completion(response['jobReference']['jobId'])
    
    def wait_job_completion(self, job_id):
        while True:
            response = self.connector.jobs().get(projectId=self._project_id,
                                                 jobId=job_id).execute()
            if response['status']['state'] == 'DONE':
                return
    

    how 输入接受文档中此字段的可用选项(例如“WRITE_TRUNCATE”或“WRITE_APPEND”)。

    例如,您可以从 csv 文件加载数据,在这种情况下,configuration 变量将被定义为:

    "configuration": {
      "load": {
        "fieldDelimiter": "\t"
        "sourceFormat": "CSV"
        "destinationTable": {
          "projectId": your_project_id,
          "tableId": table_id,
          "datasetId": your_dataset_id
        },
        "writeDisposition": "WRITE_TRUNCATE"
        "schema": schema,
        "sourceUris": file_location_in_google_cloud_storage
      },
    }
    

    (以制表符分隔的 csv 文件为例。它也可以是 json 文件,文档将引导您完成可用选项)。

    运行 jobs() 需要一些时间才能完成(这就是我们创建 wait_job_completion 方法的原因)。不过与实时流媒体相比,它应该更便宜。

    如有任何问题,请告诉我们,

    【讨论】:

    猜你喜欢
    • 2016-12-22
    • 2015-04-18
    • 2021-11-14
    • 2020-03-04
    • 2012-07-03
    • 2020-08-08
    • 1970-01-01
    • 1970-01-01
    • 2018-08-20
    相关资源
    最近更新 更多