【问题标题】:Apache airflow backfill automatic dateApache气流回填自动日期
【发布时间】:2018-10-18 12:46:03
【问题描述】:

我有一个 DAG,我想用它来回填我的数据库表。

from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from datetime import datetime, timedelta

args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 4, 1),
    'retry_delay': timedelta(minutes=1),
}

dag = DAG(dag_id='airflow_backfill', default_args=args, schedule_interval='@daily')

"""
    Task for inserting data per day
"""

task1 = PostgresOperator(
    task_id='insert_new_row',
    postgres_conn_id='aws_pg',
    sql="INSERT INTO airflow_test(date_at) VALUES('2018-04-01')",
    dag=dag,
)

task2 = PostgresOperator(
    task_id='update_team_name',
    postgres_conn_id='aws_pg',
    sql="UPDATE airflow_test SET team_name = (SELECT team_name FROM teams ORDER BY RANDOM() LIMIT 1) WHERE team_name is NULL",
    dag=dag,
)

task1.set_downstream(task2)

我从 2018 年 4 月 1 日开始在数据库中插入一行,但问题是我将 date_at 变量硬编码。

我的问题是,有什么方法可以将回填日期作为插入的值?我想在回填时自动设置“date_at”的值,但没有找到可以自动获取回填日期的气流环境/配置变量。

我正在使用 apache 气流 1.9.0。谢谢。

【问题讨论】:

    标签: airflow airflow-scheduler


    【解决方案1】:

    已编辑:您应该能够使用 jinja 模板来获取变量 execution_date:

    task1 = PostgresOperator(
        task_id='insert_new_row',
        postgres_conn_id='aws_pg',
        sql="INSERT INTO airflow_test(date_at) VALUES('{{ ds }}')",
        dag=dag,
    )
    

    https://airflow.apache.org/code.html#default-variables

    【讨论】:

    • 我推荐使用{{ ds }},因为它会根据要求将输出格式化为YYYY-MM-DDexecution_date 是一个日期时间,我不太确定格式。
    • 我也在想同样的事情,但不是 {{ ds }} 今天的日期时间戳吗?对于执行日期,您可以使用 {{ execution_date }}.strftime(%Y-%m-%d)??
    • 来自引用页面:{{ ds }}:执行日期为YYYY-MM-DD
    • 谢谢。知道这是可能的。没有过多地尝试过 Jinja 模板。编辑了答案以反映您的意见。
    猜你喜欢
    • 2017-02-14
    • 2021-03-19
    • 1970-01-01
    • 1970-01-01
    • 2016-11-11
    • 2021-04-01
    • 1970-01-01
    • 1970-01-01
    • 2023-04-02
    相关资源
    最近更新 更多