【问题标题】:Creating a new table and setting the expiration date in bigquery using Python使用 Python 在 bigquery 中创建新表并设置过期日期
【发布时间】:2017-12-09 22:10:18
【问题描述】:

这是我的代码,它从 firebase 提取实时数据库,将其格式化为 Json,上传到云端,然后上传到 BQ。

#standardsql
import json
import boto
import gcs_oauth2_boto_plugin
import os
import shutil
import StringIO
import tempfile
import time
import argparse
import uuid

from firebase import firebase
from google.cloud import storage
from google.cloud.storage import blob
from google.cloud import bigquery

firebase = firebase.FirebaseApplication('https://dataworks-356fa.firebaseio.com/')
result = firebase.get('/connection_info', None)
id_keys = map(str, result.keys())

with open("firetobq.json", "w") as outfile:
  for id in id_keys:
    json.dump(result[id], outfile, indent=None)
    outfile.write("\n")

client = storage.Client(project='dataworks-356fa')
bucket = client.get_bucket('dataworks-356fa-backups')
blob = bucket.blob('firetobq.json')
with open('firetobq.json', 'rb') as f:
  blob.upload_from_file(f)

dataset = 'dataworks-356fa'
source = 'gs://dataworks-356fa-backups/firetobq.json'


def load_data_from_gcs(dataset, test12, source):
    bigquery_client = bigquery.Client(dataset)
    dataset = bigquery_client.dataset('FirebaseArchive')
    table = dataset.table('test12')
    job_name = str(uuid.uuid4())
    job1.create_disposition = 'WRITE_TRUNCATE'
    job1.begin()

    job= bigquery_client.load_table_from_storage(
        job_name, table, "gs://dataworks-356fa-backups/firetobq.json")
    job.source_format = 'NEWLINE_DELIMITED_JSON'

    job.begin()
    wait_for_job(job)

def wait_for_job(job):
    while True:
        job.reload()
        if job.state == 'DONE':
            if job.error_result:
                raise RuntimeError(job.errors)
            return
        time.sleep(1)

load_data_from_gcs(dataset, 'test12', source)

如何将其更改为,而不是导入表 test12 中的数据来创建新表并让该表在 1 周后过期。 (我很确定设置到期日期的命令必须以秒为单位。1 周 = 604800 秒)我知道如何通过命令行设置到期日期,但宁愿在这里自动完成。

这是我在添加 job1 后收到的错误。

Traceback (most recent call last):
  File "firebasetobq2.py", line 63, in <module>
    load_data_from_gcs(dataset, 'test12', source)
  File "firebasetobq2.py", line 44, in load_data_from_gcs
    job1.create_disposition = 'WRITE_TRUNCATE'
NameError: global name 'job1' is not defined

【问题讨论】:

标签: python google-bigquery create-table


【解决方案1】:

如果你想为你的表设置一个过期时间,这可能会奏效:

from datetime import datetime, timedelta
from google.cloud.bigquery.schema import SchemaField

def load_data_from_gcs(dataset,
                   table_name,
                   table_schema,
                   source,
                   source_format,
                   expiration_time):
    bigquery_client = bigquery.Client()
    dataset = bigquery_client.dataset(dataset)
    table = dataset.table(table_name)
    table.schema = table_schema
    table.expires = expiration_time
    if not table.created:
        table.create()

    job_name = str(uuid.uuid4())
    job= bigquery_client.load_table_from_storage(
        job_name, table, source)
    job.source_format = source_format

    job.begin()
    wait_for_job(job)

dataset = 'FirebaseArchive'
table_name = 'test12'
gcs_source = 'gs://dataworks-356fa-backups/firetobq.json'
source_format = 'NEWLINE_DELIMITED_JSON'
table.schema = [SchemaField(field1), SchemaField(field2), (...)]
expiration_time = datetime.now() + timedelta(seconds=604800)

load_data_from_gcs(dataset,
                   table_name,
                   table_schema,
                   gcs_source,
                   source_format,
                   expiration_time)

请注意,唯一的区别是它设置的代码行:

table.expires = expiration_time

其值必须是datetime 类型(此处定义为expiration_time = datetime.now() + timedelta(seconds=604800)

不确定是否可以使用 Python API 使用架构自动检测,但您仍然可以使用 SchemaFields 发送此信息。例如,如果您的表有两个字段 user_idjob_id,两者都是 INTEGERS,那么架构将是:

table_schema = [SchemaField('user_id', field_type='INT64'),
                SchemaField('job_id', field_type='INT64')]

有关架构如何在 BigQuery 中工作的更多信息,您可以找到 here

[编辑]:

刚刚看到你的other question,如果你想截断表格然后写入数据,你可以这样做:

job.create_disposition = 'WRITE_TRUNCATE'
job.begin()

在您的 load_data_from_gcs 函数中。这将自动删除表并使用存储文件中的数据创建一个新表。您不必为它定义一个架构,因为它之前已经定义过(因此对您来说可能是一个更容易的解决方案)。

【讨论】:

  • 我现在收到这个错误,因为我尝试使用 'Write_Truncate;里面还有我昨天工作的代码。
  • 文件“file.py”,第 28 行,在 bucket = client.get_bucket('bucket') 文件“/Library/Python/2.7/site-packages/google/cloud/storage /client.py”,第 173 行,在 get_bucket bucket.reload(client=self) 文件“/Library/Python/2.7/site-packages/google/cloud/storage/_helpers.py”,第 99 行,重新加载 _target_object= self) 文件“/Library/Python/2.7/site-packages/google/cloud/_http.py”,第 303 行,在 api_request error_info=method + ' ' + url) google.cloud.exceptions.ServiceUnavailable: 503 (GET @ 987654323@)
  • 这很奇怪。这意味着 BigQuery 没有响应请求。也许如果你稍后再试,它会起作用的。
  • 是的,我认为系统出现了故障。它现在工作但是我遇到了另一个错误。我在代码中写为 job1... 并将其插入到我的“load_data_from_GCS”函数中,但现在它说这个工作没有定义。如何正确定义 job1 以截断表'test12'
  • 确实没有job1 的定义。只需使用 job 变量并在 source_format 更新后添加 WRITE_TRUNCATE 行。之后,您可以运行 begin() 方法。我建议还花一些时间learning the basics of python,因为学习曲线往往相当容易。
猜你喜欢
  • 2018-09-15
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多