【发布时间】:2021-05-24 05:14:28
【问题描述】:
我们可以为同一个 DAG 中的不同任务设置不同的 schedule_intervals 吗?
即我有一个包含三个任务的 DAG,A >> B >> C。我希望上游任务 A 和 B 每周运行,但对于下游任务 C,我希望它每天运行。是否可以?如果是,DAG 和任务的 schedule_interval 应该是多少?
【问题讨论】:
我们可以为同一个 DAG 中的不同任务设置不同的 schedule_intervals 吗?
即我有一个包含三个任务的 DAG,A >> B >> C。我希望上游任务 A 和 B 每周运行,但对于下游任务 C,我希望它每天运行。是否可以?如果是,DAG 和任务的 schedule_interval 应该是多少?
【问题讨论】:
您可以使用两个选项ShortCircuitOperator 或BranchDayOfWeekOperator。
1 在该用例中使用BranchDayOfWeekOperator。该运算符根据一周中的特定日期进行分支:
with DAG('my_dag',
schedule_interval='@daily'
) as dag:
task1 = DummyOperator(task_id='TASK1')
task2 = DummyOperator(task_id='TASK2')
task3 = DummyOperator(task_id='TASK3')
end_task = DummyOperator(task_id='end_task')
branch = BranchDayOfWeekOperator(
task_id="make_choice",
follow_task_ids_if_true="TASK3",
follow_task_ids_if_false="end_task",
week_day="Monday",
)
task1 >> task2 >> branch >> [task3, end_task]
在此示例中,task3 将仅在星期一执行,而 task1 和 task2 将每天执行。
请注意,此运算符仅适用于 Airflow >=2.1.0,但是您可以复制运算符源代码并创建本地版本。
2 使用ShortCircuitOperator:
from datetime import date
def func():
if date.today().weekday() == 0:
return True
return False
with DAG('my_dag',
schedule_interval='@daily'
) as dag:
task1 = DummyOperator(task_id='TASK1')
task2 = DummyOperator(task_id='TASK2')
task3 = DummyOperator(task_id='TASK3')
verify = ShortCircuitOperator(task_id='check_day', python_callable=func)
task1 >> task2 >> verify >> task3
【讨论】: