如果我理解正确,你的条件是:
- 每天保持运行DAG_A
- 每天运行 DAG_B n 次
- 每次 DAG_B 运行时,它都会等待 DAG_A__Task_1 完成
我认为您可以通过指示 ExternalTaskSensor 等待 DAG_A 的期望执行日期来轻松调整当前设计。
来自ExternalTaskSensor 运算符定义:
等待不同的 DAG 或不同 DAG 中的任务在特定的 execution_date 完成
execution_date 可以使用execution_date_fn 参数定义:
execution_date_fn (Optional[Callable]) – 接收当前执行日期作为第一个位置参数和可选的上下文字典中可用的任意数量的关键字参数的函数,并返回所需的执行日期以进行查询。 execution_delta 或 execution_date_fn 可以传递给 ExternalTaskSensor,但不能同时传递。
您可以这样定义传感器:
wait_for_dag_a = ExternalTaskSensor(
task_id='wait_for_dag_a',
external_task_id="external_task_1",
external_dag_id='dag_a_id',
allowed_states=['success', 'failed'],
execution_date_fn=_get_execution_date_of_dag_a,
poke_interval=30
)
_get_execution_date_of_dag_a 使用 get_last_dagrun 对 DB 执行查询,允许您获取 DAG_A 的最后一个 execution_date。
from airflow.utils.db import provide_session
from airflow.models.dag import get_last_dagrun
@provide_session
def _get_execution_date_of_dag_a(exec_date, session=None, **kwargs):
dag_a_last_run = get_last_dagrun(
'dag_a_id', session)
return dag_a_last_run.execution_date
我希望这种方法对您有所帮助。您可以在 this answer 中找到一个工作示例。