一个很好的答案
太多了?无论如何。
许多其他答案有点方钉圆孔。添加复杂的新运算符,滥用内置变量,或者有些无法回答问题。我对它们中的任何一个都不是特别满意,因为它们要么在通过 Web UI 查看时隐藏其行为,要么容易出错,要么需要大量自定义代码(这也容易出错)。
此解决方案使用内置功能,不需要新的运算符和有限的附加代码,DAG 无需任何技巧即可通过 UI 可见,并遵循气流最佳实践(请参阅 idempotency)。
这个问题的解决方案相当复杂,所以我把它分成了几个部分。它们是:
- 如何安全地触发动态数量的任务
- 如何等待所有这些任务完成然后调用最终任务
- 如何将其集成到您的任务管道中
- 限制(没有什么是完美的)
一个任务可以触发动态数量的其他任务吗?
是的。有点。无需编写任何新的运算符,就可以让一个 DAG 触发动态数量的其他 DAG,只使用内置运算符。然后可以将其扩展为使 DAG 依赖于动态数量的其他 DAG(请参阅等待任务完成)。这类似于flinz's solution,但更健壮且自定义代码更少。
这是使用一个 BranchPythonOperator 来完成的,它选择性地触发 2 个其他 TriggerDagRunOperator。其中一个递归地重新调用当前 DAG,另一个调用外部 dag,即目标函数。
可用于触发 dag 的示例配置在 recursive_dag.py 的顶部给出。
print_conf.py(要触发的示例 DAG)
from datetime import timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
def print_output(dag_run):
dag_conf = dag_run.conf
if 'output' in dag_conf:
output = dag_conf['output']
else:
output = 'no output found'
print(output)
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'print_output',
start_date=days_ago(2),
tags=['my_test'],
default_args=default_args,
description='A simple test DAG',
schedule_interval=None
) as dag:
print_output = PythonOperator(
task_id='print_output_task',
python_callable=print_output
)
recursive_dag.py(魔法发生的地方)
"""
DAG that can be used to trigger multiple other dags.
For example, trigger with the following config:
{
"task_list": ["print_output","print_output"],
"conf_list": [
{
"output": "Hello"
},
{
"output": "world!"
}
]
}
"""
from datetime import timedelta
import json
from airflow import DAG
from airflow.operators.python import BranchPythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
}
dag_id = 'branch_recursive'
branch_id = 'branch_operator'
repeat_task_id = 'repeat_dag_operator'
repeat_task_conf = repeat_task_id + '_conf'
next_task_id = 'next_dag_operator'
next_task_conf = next_task_id + '_conf'
def choose_branch(task_instance, dag_run):
dag_conf = dag_run.conf
task_list = dag_conf['task_list']
next_task = task_list[0]
later_tasks = task_list[1:]
conf_list = dag_conf['conf_list']
# dump to string because value is stringified into
# template string, is then parsed.
next_conf = json.dumps(conf_list[0])
later_confs = conf_list[1:]
task_instance.xcom_push(key=next_task_id, value=next_task)
task_instance.xcom_push(key=next_task_conf, value=next_conf)
if later_tasks:
repeat_conf = json.dumps({
'task_list': later_tasks,
'conf_list': later_confs
})
task_instance.xcom_push(key=repeat_task_conf, value=repeat_conf)
return [next_task_id, repeat_task_id]
return next_task_id
def add_braces(in_string):
return '{{' + in_string + '}}'
def make_templated_pull(key):
pull = f'ti.xcom_pull(key=\'{key}\', task_ids=\'{branch_id}\')'
return add_braces(pull)
with DAG(
dag_id,
start_date=days_ago(2),
tags=['my_test'],
default_args=default_args,
description='A simple test DAG',
schedule_interval=None
) as dag:
branch = BranchPythonOperator(
task_id=branch_id,
python_callable=choose_branch
)
trigger_next = TriggerDagRunOperator(
task_id=next_task_id,
trigger_dag_id=make_templated_pull(next_task_id),
conf=make_templated_pull(next_task_conf)
)
trigger_repeat = TriggerDagRunOperator(
task_id=repeat_task_id,
trigger_dag_id=dag_id,
conf=make_templated_pull(repeat_task_conf)
)
branch >> [trigger_next, trigger_repeat]
此解决方案的优点是使用非常有限的自定义代码。 flinz 的解决方案可能会中途失败,导致一些计划任务和其他任务不会。然后在重试时,DAGS 可能被安排运行两次,或者在第一个 dag 上失败,导致失败的任务完成部分工作。这种方法会告诉您哪些 DAG 未能触发,并仅重试未能触发的 DAG。因此,这种方法是幂等的,而另一种则不是。
一个 DAG 能否依赖于动态数量的其他 DAGS?
是的,但是...如果任务不并行运行,这很容易做到。并行运行更复杂。
要按顺序运行,重要的更改是在trigger_next 中使用wait_for_completion=True,使用python 运算符在“trigger_next”之前设置xcom 值,并添加一个启用或禁用重复任务的分支运算符,然后具有线性相关性
setup_xcom >> trigger_next >> branch >> trigger_repeat
要并行运行,您可以类似地递归链接多个使用模板化 external_dag_id 值的 ExternalTaskSensor,以及与触发的 dag 运行相关的时间戳。要获取触发的 dag 时间戳,您可以使用触发 dag 的时间戳来触发 dag。然后这些传感器一个一个地等待所有创建的 DAG 完成,然后触发最终的 DAG。下面的代码,这次我在打印输出 DAG 中添加了一个随机睡眠,以便等待 dags 实际上做一些等待。
注意:recurse_wait_dag.py 现在定义了 2 个 dag,这两个都需要启用才能正常工作。
可用于触发 dag 的示例配置在 recurse_wait_dag.py 的顶部给出
print_conf.py(修改为添加随机睡眠)
"""
Simple dag that prints the output in DAG config
Used to demo TriggerDagRunOperator (see recursive_dag.py)
"""
from datetime import timedelta
from time import sleep
from random import randint
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
def print_output(dag_run):
sleep_time = randint(15,30)
print(f'sleeping for time: {sleep_time}')
sleep(sleep_time)
dag_conf = dag_run.conf
if 'output' in dag_conf:
output = dag_conf['output']
else:
output = 'no output found'
print(output)
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'print_output',
start_date=days_ago(2),
tags=['my_test'],
default_args=default_args,
description='A simple test DAG',
schedule_interval=None
) as dag:
print_output = PythonOperator(
task_id='print_output_task',
python_callable=print_output
)
recurse_wait_dag.py(更神奇的地方)
"""
DAG that can be used to trigger multiple other dags,
waits for all dags to execute, then triggers a final dag.
For example, trigger the DAG 'recurse_then_wait' with the following config:
{
"final_task": "print_output",
"task_list": ["print_output","print_output"],
"conf_list": [
{
"output": "Hello"
},
{
"output": "world!"
}
]
}
"""
from datetime import timedelta
import json
from airflow import DAG
from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils import timezone
from common import make_templated_pull
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
}
def to_conf(id):
return f'{id}_conf'
def to_execution_date(id):
return f'{id}_execution_date'
def to_ts(id):
return f'{id}_ts'
recurse_dag_id = 'recurse_then_wait'
branch_id = 'recursive_branch'
repeat_task_id = 'repeat_dag_operator'
repeat_task_conf = to_conf(repeat_task_id)
next_task_id = 'next_dag_operator'
next_task_conf = to_conf(next_task_id)
next_task_execution_date = to_execution_date(next_task_id)
end_task_id = 'end_task'
end_task_conf = to_conf(end_task_id)
wait_dag_id = 'wait_after_recurse'
choose_wait_id = 'choose_wait'
next_wait_id = 'next_wait'
next_wait_ts = to_ts(next_wait_id)
def choose_branch(task_instance, dag_run, ts):
dag_conf = dag_run.conf
task_list = dag_conf['task_list']
next_task = task_list[0]
# can't have multiple dag runs of same DAG with same timestamp
assert next_task != recurse_dag_id
later_tasks = task_list[1:]
conf_list = dag_conf['conf_list']
next_conf = json.dumps(conf_list[0])
later_confs = conf_list[1:]
triggered_tasks = dag_conf.get('triggered_tasks', []) + [(next_task, ts)]
task_instance.xcom_push(key=next_task_id, value=next_task)
task_instance.xcom_push(key=next_task_conf, value=next_conf)
task_instance.xcom_push(key=next_task_execution_date, value=ts)
if later_tasks:
repeat_conf = json.dumps({
'task_list': later_tasks,
'conf_list': later_confs,
'triggered_tasks': triggered_tasks,
'final_task': dag_conf['final_task']
})
task_instance.xcom_push(key=repeat_task_conf, value=repeat_conf)
return [next_task_id, repeat_task_id]
end_conf = json.dumps({
'tasks_to_wait': triggered_tasks,
'final_task': dag_conf['final_task']
})
task_instance.xcom_push(key=end_task_conf, value=end_conf)
return [next_task_id, end_task_id]
def choose_wait_target(task_instance, dag_run):
dag_conf = dag_run.conf
tasks_to_wait = dag_conf['tasks_to_wait']
next_task, next_ts = tasks_to_wait[0]
later_tasks = tasks_to_wait[1:]
task_instance.xcom_push(key=next_wait_id, value=next_task)
task_instance.xcom_push(key=next_wait_ts, value=next_ts)
if later_tasks:
repeat_conf = json.dumps({
'tasks_to_wait': later_tasks,
'final_task': dag_conf['final_task']
})
task_instance.xcom_push(key=repeat_task_conf, value=repeat_conf)
def execution_date_fn(_, task_instance):
date_str = task_instance.xcom_pull(key=next_wait_ts, task_ids=choose_wait_id)
return timezone.parse(date_str)
def choose_wait_branch(task_instance, dag_run):
dag_conf = dag_run.conf
tasks_to_wait = dag_conf['tasks_to_wait']
if len(tasks_to_wait) == 1:
return end_task_id
return repeat_task_id
with DAG(
recurse_dag_id,
start_date=days_ago(2),
tags=['my_test'],
default_args=default_args,
description='A simple test DAG',
schedule_interval=None
) as recursive_dag:
branch = BranchPythonOperator(
task_id=branch_id,
python_callable=choose_branch
)
trigger_next = TriggerDagRunOperator(
task_id=next_task_id,
trigger_dag_id=make_templated_pull(next_task_id, branch_id),
execution_date=make_templated_pull(next_task_execution_date, branch_id),
conf=make_templated_pull(next_task_conf, branch_id)
)
trigger_repeat = TriggerDagRunOperator(
task_id=repeat_task_id,
trigger_dag_id=recurse_dag_id,
conf=make_templated_pull(repeat_task_conf, branch_id)
)
trigger_end = TriggerDagRunOperator(
task_id=end_task_id,
trigger_dag_id=wait_dag_id,
conf=make_templated_pull(end_task_conf, branch_id)
)
branch >> [trigger_next, trigger_repeat, trigger_end]
with DAG(
wait_dag_id,
start_date=days_ago(2),
tags=['my_test'],
default_args=default_args,
description='A simple test DAG',
schedule_interval=None
) as wait_dag:
py_operator = PythonOperator(
task_id=choose_wait_id,
python_callable=choose_wait_target
)
sensor = ExternalTaskSensor(
task_id='do_wait',
external_dag_id=make_templated_pull(next_wait_id, choose_wait_id),
execution_date_fn=execution_date_fn
)
branch = BranchPythonOperator(
task_id=branch_id,
python_callable=choose_wait_branch
)
trigger_repeat = TriggerDagRunOperator(
task_id=repeat_task_id,
trigger_dag_id=wait_dag_id,
conf=make_templated_pull(repeat_task_conf, choose_wait_id)
)
trigger_end = TriggerDagRunOperator(
task_id=end_task_id,
trigger_dag_id='{{ dag_run.conf[\'final_task\'] }}'
)
py_operator >> sensor >> branch >> [trigger_repeat, trigger_end]
与您的代码集成
这很好,但你想实际使用它。那么,你需要做什么?该问题包括一个尝试执行以下操作的示例:
|---> Task B.1 --|
|---> Task B.2 --|
Task A ------|---> Task B.3 --|-----> Task C
| .... |
|---> Task B.N --|
要实现问题目标(下面的示例实现),您需要将任务 A、B 和 C 分离到各自的 DAG 中。然后,在 DAG A 的末尾添加一个新的运算符来触发上述 DAG 'recurse_then_wait'。向这个 dag 传递一个配置,其中包括每个 B DAG 所需的配置,以及 B dag id(这可以很容易地更改为使用不同的 dag,发疯)。然后包含 DAG C 的名称,即最终的 DAG,将在最后运行。此配置应如下所示:
{
"final_task": "C_DAG",
"task_list": ["B_DAG","B_DAG"],
"conf_list": [
{
"b_number": 1,
"more_stuff": "goes_here"
},
{
"b_number": 2,
"foo": "bar"
}
]
}
实现后应该是这样的:
trigger_recurse.py
from datetime import timedelta
import json
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago
from recurse_wait_dag import recurse_dag_id
def add_braces(in_string):
return '{{' + in_string + '}}'
def make_templated_pull(key, task_id):
pull = f'ti.xcom_pull(key=\'{key}\', task_ids=\'{task_id}\')'
return add_braces(pull)
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
}
setup_trigger_conf_id = 'setup_trigger_conf'
trigger_conf_key = 'trigger_conf'
def setup_trigger_conf(task_instance):
trigger_conf = {
'final_task': 'print_output',
'task_list': ['print_output','print_output'],
'conf_list': [
{
'output': 'Hello'
},
{
'output': 'world!'
}
]
}
print('Triggering the following tasks')
for task, conf in zip(trigger_conf['task_list'], trigger_conf['conf_list']):
print(f' task: {task} with config {json.dumps(conf)}')
print(f'then waiting for completion before triggering {trigger_conf["final_task"]}')
task_instance.xcom_push(key=trigger_conf_key, value=json.dumps(trigger_conf))
with DAG(
'trigger_recurse_example',
start_date=days_ago(2),
tags=['my_test'],
default_args=default_args,
description='A simple test DAG',
schedule_interval=None
) as dag:
py_operator = PythonOperator(
task_id=setup_trigger_conf_id,
python_callable=setup_trigger_conf
)
trigger_operator = TriggerDagRunOperator(
task_id='trigger_call_and_wait',
trigger_dag_id=recurse_dag_id,
conf=make_templated_pull(trigger_conf_key, setup_trigger_conf_id)
)
py_operator >> trigger_operator
所有这些最终看起来像下面这样,垂直和水平线显示一个 DAG 触发另一个 DAG 的位置:
A
|
Recurse - B.1
|
Recurse - B.2
|
...
|
Recurse - B.N
|
Wait for B.1
|
Wait for B.2
|
...
|
Wait for B.N
|
C
限制
任务不再在单个图表上可见。这可能是这种方法的最大问题。通过将标签添加到所有关联的 DAG,至少可以一起查看 DAG。然而,将 DAG B 的多个并行运行与 DAG A 的运行联系起来是混乱的。但是,由于单个 DAG 运行显示其输入配置,这意味着每个 DAG B 运行不依赖于 DAG A,仅依赖于它的输入配置。因此,这种关系至少可以部分忽略。
任务不能再使用 xcom 进行通信。 B 任务可以通过 DAG 配置从任务 A 接收输入,但是任务 C 无法从 B 任务获得输出。所有 B 任务的结果应该放在一个已知位置,然后由任务 C 读取。
'recurse_and_wait' 的配置参数可能会被改进以结合 task_list 和 conf_list,但这解决了上述问题。
最终 DAG 没有配置。这应该很容易解决。