【发布时间】:2021-09-19 11:16:42
【问题描述】:
我是 Apache Airflow 的新手。我的任务是从 Google Cloud Storage 读取数据,转换数据并将转换后的数据上传到 BigQuery 表中。我能够从 Cloud Storage 存储桶中获取数据并将其直接存储到 BigQuery 表中。我不确定如何在此管道中包含转换函数。
这是我的代码:
# Import libraries needed for the operation
import airflow
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
# Default Argument
default_args = {
'owner': <OWNER_NAME>,
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=2),
}
# DAG Definition
dag = DAG('load_from_bucket_to_bq',
schedule_interval='0 * * * *',
default_args=default_args)
# Variable Configurations
BQ_CONN_ID = <CONN_ID>
BQ_PROJECT = <PROJECT_ID>
BQ_DATASET = <DATASET_ID>
with dag:
# Tasks
start = DummyOperator(
task_id='start'
)
upload = GoogleCloudStorageToBigQueryOperator(
task_id='load_from_bucket_to_bigquery',
bucket=<BUCKET_NAME>,
source_objects=['*.csv'],
schema_fields=[
{'name': 'Active_Cases', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'Country', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'Last_Update', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'New_Cases', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'New_Deaths', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'Total_Cases', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'Total_Deaths', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'Total_Recovered', 'type': 'STRING', 'mode': 'NULLABLE'},
],
destination_project_dataset_table=BQ_PROJECT + '.' + BQ_DATASET + '.' + <TABLE_NAME>,
write_disposition='WRITE_TRUNCATE',
google_cloud_storage_conn_id=BQ_CONN_ID,
bigquery_conn_id=BQ_CONN_ID,
dag = dag
)
end = DummyOperator(
task_id='end'
)
# Setting Dependencies
start >> upload >> end
感谢任何有关如何进行的帮助。谢谢。
【问题讨论】:
-
你会做什么样的变换?
-
我每天都会收到 COVID 病例数据。我想每天处理不同的案例并将其存储在 BigQuery 表中。
-
您可以尝试创建一个执行转换的 python 函数并在您的 DAG 上使用 PythonOperator 在运行时调用该函数。
-
是否有操作员可以从云存储桶中获取数据并在 Python 函数中使用?没找到,所以直接用了GoogleCloudStorageToBigQueryOperator。
-
GCSToLocalFilesystemOperator 是唯一从 GCP 存储桶下载数据的算子吗?
标签: google-cloud-platform google-bigquery airflow