【发布时间】:2021-05-11 18:14:25
【问题描述】:
如何在 Airflow 2.0 中安排 DAG 使其不在节假日运行?
问题 1:每个月的第 5 个工作日运行一次? 问题 2 : 每月第 5 个工作日运行,如果第 5 天是节假日,那么应该在不是节假日的第二天运行?
【问题讨论】:
如何在 Airflow 2.0 中安排 DAG 使其不在节假日运行?
问题 1:每个月的第 5 个工作日运行一次? 问题 2 : 每月第 5 个工作日运行,如果第 5 天是节假日,那么应该在不是节假日的第二天运行?
【问题讨论】:
目前无法做到这一点(至少本机无法做到)。 Airflow DAG 接受单个 cron 表达式或 timedelta。如果你不能用其中之一说出所需的调度逻辑,那么你就不能在 Airflow 中进行这种调度。好消息是 Airflow 有 AIP-39 Richer scheduler_interval 来解决这个问题,并在未来的版本中提供更多的调度功能。
也就是说,您可以通过将 DAG 设置为使用 schedule_interval="@daily" 运行并将 BranchPythonOperator 作为 DAG 的第一个任务来解决此问题。在 Python 可调用文件中,您可以编写所需的调度逻辑,这意味着如果是每月的第 5 个工作日,您的函数将返回 True,否则将返回 False,并且您的工作流将相应地分支。对于 True 它将继续执行,对于 False 它将结束。这并不理想,但它会起作用。一个可能的模板可以是:
def check_fifth():
#write the logic of finding if today is the 5th working day of the month
if logic:
return "continue_task"
else:
return "stop_task"
with DAG(dag_id='stackoverflow',
default_args=default_args,
schedule_interval="@daily",
catchup=False
) as dag:
start_op = BranchPythonOperator(
task_id='choose',
python_callable=check_fifth
)
stop_op = DummyOperator(
task_id='stop_task'
)
#replace with the operator that you want to actually execute
continue_op = YourOperator(
task_id='continue_task'
)
start_op >> [stop_op, continue_op]
【讨论】: