【问题标题】:Airflow how to set default values for dag_run.confAirflow 如何为 dag_run.conf 设置默认值
【发布时间】:2021-10-01 21:45:34
【问题描述】:

我正在尝试设置一个 Airflow DAG,它提供可从 dag_run.conf 获得的默认值。这在使用“Run w/ Config”选项从 webUI 运行 DAG 时效果很好。但是,当按计划运行时,dag_run.conf 字典不存在,任务将失败,例如

jinja2.exceptions.UndefinedError: 'dict object' has no attribute 'key1'

以下是一个示例作业。

是否有可能使dag_run.conf 始终包含params 定义的字典?

from airflow import DAG
from airflow.utils.dates import hours_ago
from airflow.operators.bash import BashOperator
from datetime import timedelta

def do_something(val1: str, val2: str) -> str:
    return f'echo vars are: "{val1}, {val2}"'

params = {
    'key1': 'def1',
    'key2': 'def2',        
}

default_args = {
    'retries': 0,
}

with DAG(
    'template_test',
    default_args=default_args,
    schedule_interval=timedelta(minutes=1),
    start_date=hours_ago(1),
    params = params,
) as dag:

    hello_t = BashOperator(
        task_id='example-command',
        bash_command=do_something('{{dag_run.conf["key1"]}}', '{{dag_run.conf["key2"]}}'),
        config=params,
    )

我见过的最接近的是For Apache Airflow, How can I pass the parameters when manually trigger DAG via CLI?,但是它们利用了 Jinja 和 if/else - 这需要定义这些默认参数两次。我只想定义一次。

【问题讨论】:

    标签: airflow


    【解决方案1】:

    您可以使用 DAG params 来实现您正在寻找的内容:

    params (dict) – DAG 级别参数的字典,可在模板中访问,命名空间在 params 下。这些参数可以在任务级别被覆盖。

    您可以在 DAG 或任务级别定义 params,也可以在 Trigger DAG w/config 部分的 UI 中添加或修改它们。

    DAG 示例:

    default_args = {
        "owner": "airflow",
    }
    
    dag = DAG(
        dag_id="example_dag_params",
        default_args=default_args,
        schedule_interval="*/5 * * * *",
        start_date=days_ago(1),
        params={"param1": "first_param"},
        catchup=False,
    )
    with dag:
    
        bash_task = BashOperator(
            task_id="bash_task", bash_command="echo bash_task: {{ params.param1 }}"
        )
    

    输出日志

    [2021-10-02 20:23:25,808] {logging_mixin.py:104} INFO - Running <TaskInstance: example_dag_params.bash_task 2021-10-02T23:15:00+00:00 [running]> on host worker_01
    [2021-10-02 20:23:25,867] {taskinstance.py:1302} INFO - Exporting the following env vars:
    AIRFLOW_CTX_DAG_OWNER=***
    AIRFLOW_CTX_DAG_ID=example_dag_params
    AIRFLOW_CTX_TASK_ID=bash_task
    AIRFLOW_CTX_EXECUTION_DATE=2021-10-02T23:15:00+00:00
    AIRFLOW_CTX_DAG_RUN_ID=scheduled__2021-10-02T23:15:00+00:00
    [2021-10-02 20:23:25,870] {subprocess.py:52} INFO - Tmp dir root location: 
     /tmp
    [2021-10-02 20:23:25,871] {subprocess.py:63} INFO - Running command: ['bash', '-c', 'echo bash_task: first_param']
    [2021-10-02 20:23:25,884] {subprocess.py:74} INFO - Output:
    [2021-10-02 20:23:25,886] {subprocess.py:78} INFO - bash_task: first_param
    [2021-10-02 20:23:25,887] {subprocess.py:82} INFO - Command exited with return code 0
    

    从日志中,请注意dag_run计划,并且参数仍然存在。

    您可以在this answer 中找到更多关于使用参数的示例。

    希望对你有用!

    【讨论】:

    • 这行得通,谢谢!奇怪的是,触发页面和文档建议使用dag_run.conf,恕我直言,这显然是一种更明智的模式
    猜你喜欢
    • 2018-06-20
    • 2011-08-31
    • 2020-12-14
    • 2012-05-02
    • 2017-12-26
    • 1970-01-01
    • 2020-11-25
    • 2012-08-19
    • 1970-01-01
    相关资源
    最近更新 更多