【发布时间】:2018-10-18 12:46:03
【问题描述】:
我有一个 DAG,我想用它来回填我的数据库表。
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from datetime import datetime, timedelta
args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 4, 1),
'retry_delay': timedelta(minutes=1),
}
dag = DAG(dag_id='airflow_backfill', default_args=args, schedule_interval='@daily')
"""
Task for inserting data per day
"""
task1 = PostgresOperator(
task_id='insert_new_row',
postgres_conn_id='aws_pg',
sql="INSERT INTO airflow_test(date_at) VALUES('2018-04-01')",
dag=dag,
)
task2 = PostgresOperator(
task_id='update_team_name',
postgres_conn_id='aws_pg',
sql="UPDATE airflow_test SET team_name = (SELECT team_name FROM teams ORDER BY RANDOM() LIMIT 1) WHERE team_name is NULL",
dag=dag,
)
task1.set_downstream(task2)
我从 2018 年 4 月 1 日开始在数据库中插入一行,但问题是我将 date_at 变量硬编码。
我的问题是,有什么方法可以将回填日期作为插入的值?我想在回填时自动设置“date_at”的值,但没有找到可以自动获取回填日期的气流环境/配置变量。
我正在使用 apache 气流 1.9.0。谢谢。
【问题讨论】: