【问题标题】:Bigquery : Create table if not exist and load data using Python and Apache AirFlowBigquery:如果不存在则创建表并使用 Python 和 Apache AirFlow 加载数据
【发布时间】:2019-08-16 09:34:01
【问题描述】:

首先,我使用 MySQL 查询从生产数据库中获取所有数据,然后将该数据作为 NEW LINE DELIMITED JSON 存储在谷歌云存储中,我想要做的是:
1。检查表是否存在
2。如果表不存在,则使用自动检测模式创建表
3。存储数据

所有这些都将安排在气流中。真正让我困惑的是数字2,我怎样才能在 Python 中做到这一点?还是 Airflow 可以自动执行此操作?

【问题讨论】:

    标签: python mysql json google-bigquery airflow


    【解决方案1】:

    Airflow 可以自动执行此操作。 create_disposition 参数在需要时创建表。 autodetect 参数正是您所需要的。这适用于 Airflow 1.10.2

    GCS_to_BQ = GoogleCloudStorageToBigQueryOperator(
        task_id='gcs_to_bq',
        bucket='test_bucket',
        source_objects=['folder1/*.csv', 'folder2/*.csv'],
        destination_project_dataset_table='dest_table',
        source_format='CSV',
        create_disposition='CREATE_IF_NEEDED',
        write_disposition='WRITE_TRUNCATE',
        bigquery_conn_id='bq-conn',
        google_cloud_storage_conn_id='gcp-conn',
        autodetect=True, # This uses autodetect
        dag=dag
    )
    

    【讨论】:

    • 我也有同样的疑问,但这是针对 BigQueryCreateEmptyTableOperator 的。当我们使用运算符创建表时。我们需要在参数中指定 table_id='Emp_7 是表名。我的问题是如果表已经存在所以它不应该创建一个表,如果表不存在那么只应该创建它。我们怎么能做到这一点??
    【解决方案2】:

    在 BigQuery 命令行中,如果您的 json 文件位于 GCS 上,则 Loading JSON data with schema auto-detection 在一个命令中为您执行 2 + 3。

    查看 AirFlow 文档,GoogleCloudStorageToBigQueryOperator 似乎在做同样的事情。我检查了它的source,它只是调用 BigQuery load api。我相信它会做你想做的。

    当不清楚每个参数的含义时,您可以使用参数名称搜索BigQuery Jobs api

    例如,要在您的任务列表中实现 1,您只需指定:

    write_disposition (string) – 如果表已存在,则写入配置。

    但为了知道您需要作为 write_disposition 传递的字符串,您必须在 BigQuery 文档中进行搜索。

    【讨论】:

    • 我也有同样的疑问,但这是针对 BigQueryCreateEmptyTableOperator 的。当我们使用运算符创建表时。我们需要在参数中指定 table_id='Emp_7 是表名。我的问题是如果表已经存在所以它不应该创建一个表,如果表不存在那么只应该创建它。我们怎么能做到这一点??
    猜你喜欢
    • 2018-11-07
    • 2021-09-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-07-04
    • 2023-04-05
    • 1970-01-01
    相关资源
    最近更新 更多