【问题标题】:Airflow DAG: How to insert data into a table using Python operator, not BigQuery operator?Airflow DAG:如何使用 Python 运算符而不是 BigQuery 运算符将数据插入表中?
【发布时间】:2022-11-17 22:54:57
【问题描述】:

我正在尝试使用简单的 Python 运算符而不是 BigQuery 运算符将一些数据插入表中,但我不确定如何实现它。我正在尝试以 Airflow DAG 的形式实现它。

我编写了一个简单的 DAG,并且设法使用以下方法将数据从 GCS Bucket 插入到 BigQuery,但我想使用 Python 运算符而不是 BigQuery 来执行此操作:

load_csv = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
    task_id='gcs_to_bq_example',
    bucket='cloud-samples-data',
    source_objects=['bigquery/us-states/us-states.csv'],
    destination_project_dataset_table='airflow_test.gcs_to_bq_table',
    schema_fields=[
        {'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name': 'post_abbr', 'type': 'STRING', 'mode': 'NULLABLE'},
    ],
    write_disposition='WRITE_TRUNCATE',
    dag=dag)

我想使用简单的 Python 运算符而不是 BigQuery 来实现上述目标。

BQ 到 GCS: BigQuery 到 GCS:

# from google.cloud import bigquery
# client = bigquery.Client()
# bucket_name = 'my-bucket'
project = "bigquery-public-data"
dataset_id = "samples"
table_id = "shakespeare"

destination_uri = "gs://{}/{}".format(bucket_name, "shakespeare.csv")
dataset_ref = bigquery.DatasetReference(project, dataset_id)
table_ref = dataset_ref.table(table_id)

extract_job = client.extract_table(
    table_ref,
    destination_uri,
    # Location must match that of the source table.
    location="US",
)  # API request
extract_job.result()  # Waits for job to complete.

print(
    "Exported {}:{}.{} to {}".format(project, dataset_id, table_id, destination_uri)
)

【问题讨论】:

  • 你为什么要使用 PythonOperator 而不是 GCSToBigQueryOperator
  • 我想使用 PythonOperator 完成同样的任务。我只需要帮助来编写我的代码,而是使用 PythonOperator 代替。这只是为了扩展我的知识。

标签: python sql google-cloud-storage airflow directed-acyclic-graphs


【解决方案1】:

您可以在PythonOperator中使用BigQueryPythonclientGCS文件插入BigQuery,示例:

PythonOperator(
    task_id="gcs_to_bq",
    op_kwargs={
      'dataset': 'dataset',
      'table': 'table'
    },
    python_callable=load_gcs_files_to_bq
)

def load_gcs_files_to_bq(dataset, table):
   from google.cloud import bigquery

   # Construct a BigQuery client object.
   client = bigquery.Client()

   # TODO(developer): Set table_id to the ID of the table to create.
   table_id = f"your-project.{dataset}.{table}"

   job_config = bigquery.LoadJobConfig(
        schema=[
             bigquery.SchemaField("name", "STRING"),
             bigquery.SchemaField("post_abbr", "STRING"),
        ],
        skip_leading_rows=1,
        # The source format defaults to CSV, so the line below is optional.
        source_format=bigquery.SourceFormat.CSV,
    )
    
    uri = "gs://cloud-samples-data/bigquery/us-states/us-states.csv"

    load_job = client.load_table_from_uri(
        uri, table_id, job_config=job_config
    )  # Make an API request.

    load_job.result()  # Waits for the job to complete.

    destination_table = client.get_table(table_id)  # Make an API request.
    print("Loaded {} rows.".format(destination_table.num_rows))

【讨论】:

  • 感谢您抽出宝贵时间为我的回答提供解决方案。我假设我可以将其合并到我的 Airflow DAG 中,对吗?
  • 不客气,你必须稍微调整一下这段代码,例如设置你的projectIddatasettable。还要检查 GCS uri 并设置您的路径。 BigQuery 模式似乎没问题。
  • 非常感谢。是的,我知道我必须改变一些东西,但我绝对可以把它放到我的 DAG 中。
  • 不客气:),是的,你可以。不要犹豫,添加投票以提高我的答案的可见度并帮助他人。
  • 抱歉,我没有足够的代表来投票。
猜你喜欢
  • 2021-02-10
  • 1970-01-01
  • 1970-01-01
  • 2020-11-08
  • 2019-07-07
  • 2012-12-05
  • 2018-12-02
  • 1970-01-01
  • 2021-02-26
相关资源
最近更新 更多