【问题标题】:How to add new column with metadata value to csv when loading it to bigquery将具有元数据值的新列加载到bigquery时如何将其添加到csv
【发布时间】:2020-01-17 07:53:17
【问题描述】:

我有一个每日 csv 文件进入我在谷歌存储上的存储桶,我构建了一个函数来加载这个 csv 并在它进入时将其附加到 BigQuery 中的一个表中。但是,我想在 csv 中添加一个新列在我将数据加载到大查询之前使用函数执行 id (context["id"])。

这可能吗?

提前致谢!

def TimeTableToBigQuery(data, context):
    # Getting metadata about the uploaded file, the storage and datetime of insert
    excution_id = context['event_id']
    bucketname = data['bucket']
    filename = data['name']
    timeCreated = data['timeCreated']
    pathtofile = data["id"].rsplit("/", 2)
    # parent_folder = data["id"].rsplit("/", 3)
    file = str(pathtofile[1])
    name = file.split('---')
    dates = name[0].split('_', 1)
    arrivedat = str(dates[1])
    path = pathtofile[0]
    # parent_folder = parent_folder[1]

    # work start here to get the data into the table we establish a job before we send this job to load :)
    client = bigquery.Client()
    dataset_id = 'nature_bi'
    dataset_ref = client.dataset(dataset_id)
    job_config = bigquery.LoadJobConfig()
    job_config.skip_leading_rows = 1
    job_config.field_delimiter = ';',
    job_config.allow_jagged_rows = True
    job_config.allow_quoted_newlines = True
    job_config.write_disposition = 'WRITE_TRUNCATE',
    job_config.source_format = bigquery.SourceFormat.CSV
    job_config.schema = [
        bigquery.SchemaField('Anstallningsnummer', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('Datum', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('Kod', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('Kostnadsstalle', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('Tidkod', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('OB_tidkod', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('Dagsschema', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('Schemalagd_arbetstid', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('Summa_narvaro', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('Summa_franvaro', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('Datum_for_klarmarkering', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('Datum_for_attestering', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('Frislappsdatum', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('Export_klockslag', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('Vecka', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('RowHashKey', 'STRING', mode='NULLABLE', description='to be written in BQ'),
        bigquery.SchemaField('MergeState', 'INTEGER', mode='NULLABLE', description='for merging data in BQ'),
        bigquery.SchemaField('SourceName', 'STRING', mode='NULLABLE', description='Path to file'),
        bigquery.SchemaField('SourceScript', 'STRING', mode='NULLABLE', description='Path to file'),
        bigquery.SchemaField('ArriveDateTime', 'STRING', mode='NULLABLE', description='Path to file'),
        bigquery.SchemaField('InsertDateTime', 'STRING', mode='NULLABLE', description='Path to file'),
        bigquery.SchemaField('ExecutionID', 'STRING', mode='NULLABLE', description='Path to file')
    ]

    uri = 'gs://%s/%s' % (bucketname, filename)
    print('Received file "%s" at %s.' % (
        uri,
        timeCreated
    ))
    tablename = 'employee_time'
    table_id = dataset_ref.table(tablename)  # table_id = "its value was in load_job="
    # get the URI for uploaded CSV in GCS from 'data'
    uri = 'gs://' + data['bucket'] + '/' + data['name']

    # lets do this and send our job that we configured before to load to BQ
    load_job = client.load_table_from_uri(
        uri,
        table_id,
        job_config=job_config)
    # Here we print some information in the log to track our work
    print('Starting job with ID {}'.format(load_job.job_id))
    print('File: {}'.format(data['name']))
    load_job.result()  # wait for table load to complete.
    print('Job finished.')
    destination_table = client.get_table(dataset_ref.table(tablename))
    print('Loaded {} rows.'.format(destination_table.num_rows))

【问题讨论】:

    标签: python-3.x google-cloud-platform google-bigquery google-cloud-functions


    【解决方案1】:

    你有 3 种方法来实现这一目标

    • 重写文件
      • 逐行读取文件
      • 在每一行上,添加您想要的字段
      • 写入本地文件(/tmp 目录可用,它在内存中并且受限于函数内存的大小)。
      • 然后将此文件加载到您的表中
    • 如果要保留数据 (WRITE_APPEND)
      • 在临时表中按原样加载文件
      • 等待加载作业结束
      • 运行类似insert into <your table> select *,CURRENT_TIMESTAMP() AS InsertDateTim,<your executionId> AS ExecutionId FROM <temp table> 的查询。
      • 然后删除临时表(或在删除表有效期为 1 天的数据集中创建它)。但是,请注意,该功能最多可以使用 9 分钟,如果您的文件很大,则可能需要一些时间才能在一个功能中执行所有这些操作。您可以构建更复杂的东西(如果您愿意,我可以详细说明)。此外,您查询所有临时数据以将其放入最终表中,如果您有大量数据,这可能会产生成本。
    • 如果您执行 WRITE_TRUNCATE(如您的代码示例中所示),您可以执行更智能的操作。

      • 删除以前的现有表
      • 将文件加载到表名类似于nature_bi_<insertDate>_<executionId> 的表中
      • 查询时,将表名注入到查询结果中(这里我简单添加表名,但使用UDFnative BigQuery function,可以提取日期和executionId)李>
    SELECT *,(SELECT table_id
          FROM `<project>.<dataset>.__TABLES_SUMMARY__`
          WHERE table_id LIKE 'nature_bi%') FROM `<project>.<dataset>.nature_bi*` LIMIT 1000
    

    所有解决方案都有效,具体取决于您的约束和文件大小

    【讨论】:

    • 这个答案背后有很多知识!非常感谢您对我的问题解决方案的详细解释@guillaume
    猜你喜欢
    • 2021-11-17
    • 2011-09-19
    • 2021-02-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多