【发布时间】:2021-10-01 21:45:34
【问题描述】:
我正在尝试设置一个 Airflow DAG,它提供可从 dag_run.conf 获得的默认值。这在使用“Run w/ Config”选项从 webUI 运行 DAG 时效果很好。但是,当按计划运行时,dag_run.conf 字典不存在,任务将失败,例如
jinja2.exceptions.UndefinedError: 'dict object' has no attribute 'key1'
以下是一个示例作业。
是否有可能使dag_run.conf 始终包含params 定义的字典?
from airflow import DAG
from airflow.utils.dates import hours_ago
from airflow.operators.bash import BashOperator
from datetime import timedelta
def do_something(val1: str, val2: str) -> str:
return f'echo vars are: "{val1}, {val2}"'
params = {
'key1': 'def1',
'key2': 'def2',
}
default_args = {
'retries': 0,
}
with DAG(
'template_test',
default_args=default_args,
schedule_interval=timedelta(minutes=1),
start_date=hours_ago(1),
params = params,
) as dag:
hello_t = BashOperator(
task_id='example-command',
bash_command=do_something('{{dag_run.conf["key1"]}}', '{{dag_run.conf["key2"]}}'),
config=params,
)
我见过的最接近的是For Apache Airflow, How can I pass the parameters when manually trigger DAG via CLI?,但是它们利用了 Jinja 和 if/else - 这需要定义这些默认参数两次。我只想定义一次。
【问题讨论】:
标签: airflow