【问题标题】:How to transform data before loading into BigQuery in Apache Airflow?如何在 Apache Airflow 中加载到 BigQuery 之前转换数据?
【发布时间】:2021-09-19 11:16:42
【问题描述】:

我是 Apache Airflow 的新手。我的任务是从 Google Cloud Storage 读取数据,转换数据并将转换后的数据上传到 BigQuery 表中。我能够从 Cloud Storage 存储桶中获取数据并将其直接存储到 BigQuery 表中。我不确定如何在此管道中包含转换函数。

这是我的代码:

# Import libraries needed for the operation
import airflow
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator

# Default Argument
default_args = {
    'owner': <OWNER_NAME>,
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=2),
}

# DAG Definition
dag = DAG('load_from_bucket_to_bq',
schedule_interval='0 * * * *',
default_args=default_args)

# Variable Configurations
BQ_CONN_ID = <CONN_ID>
BQ_PROJECT = <PROJECT_ID>
BQ_DATASET = <DATASET_ID>

with dag:
    # Tasks
    start = DummyOperator(
        task_id='start'
    )

    upload = GoogleCloudStorageToBigQueryOperator(
        task_id='load_from_bucket_to_bigquery',
        bucket=<BUCKET_NAME>,
        source_objects=['*.csv'],
        schema_fields=[
            {'name': 'Active_Cases', 'type': 'STRING', 'mode': 'NULLABLE'},
            {'name': 'Country', 'type': 'STRING', 'mode': 'NULLABLE'},
            {'name': 'Last_Update', 'type': 'STRING', 'mode': 'NULLABLE'},
            {'name': 'New_Cases', 'type': 'STRING', 'mode': 'NULLABLE'},
            {'name': 'New_Deaths', 'type': 'STRING', 'mode': 'NULLABLE'},
            {'name': 'Total_Cases', 'type': 'STRING', 'mode': 'NULLABLE'},
            {'name': 'Total_Deaths', 'type': 'STRING', 'mode': 'NULLABLE'},
            {'name': 'Total_Recovered', 'type': 'STRING', 'mode': 'NULLABLE'},
        ],
        destination_project_dataset_table=BQ_PROJECT + '.' + BQ_DATASET + '.' + <TABLE_NAME>,
        write_disposition='WRITE_TRUNCATE',
        google_cloud_storage_conn_id=BQ_CONN_ID,
        bigquery_conn_id=BQ_CONN_ID,
        dag = dag
    )

    end = DummyOperator(
        task_id='end'
    )

    # Setting Dependencies
    start >> upload >> end

感谢任何有关如何进行的帮助。谢谢。

【问题讨论】:

  • 你会做什么样的变换?
  • 我每天都会收到 COVID 病例数据。我想每天处理不同的案例并将其存储在 BigQuery 表中。
  • 您可以尝试创建一个执行转换的 python 函数并在您的 DAG 上使用 PythonOperator 在运行时调用该函数。
  • 是否有操作员可以从云存储桶中获取数据并在 Python 函数中使用?没找到,所以直接用了GoogleCloudStorageToBigQueryOperator。
  • GCSToLocalFilesystemOperator 是唯一从 GCP 存储桶下载数据的算子吗?

标签: google-cloud-platform google-bigquery airflow


【解决方案1】:

发布与@sachinmb27 的对话作为答案。转换可以放在 python 函数中,并使用PythonOperator 在运行时调用转换函数。有关 Airflow 中可以使用哪些运算符的更多详细信息,请参阅Airflow Operator docs

【讨论】:

    猜你喜欢
    • 2021-03-26
    • 2019-08-19
    • 2023-04-05
    • 2019-08-16
    • 1970-01-01
    • 2019-04-22
    • 1970-01-01
    • 2017-12-26
    • 1970-01-01
    相关资源
    最近更新 更多