【问题标题】:DAG schedule in Airflow 2.0Airflow 2.0 中的 DAG 计划
【发布时间】:2021-05-11 18:14:25
【问题描述】:

如何在 Airflow 2.0 中安排 DAG 使其不在节假日运行?

问题 1:每个月的第 5 个工作日运行一次? 问题 2 : 每月第 5 个工作日运行,如果第 5 天是节假日,那么应该在不是节假日的第二天运行?

【问题讨论】:

    标签: airflow-scheduler airflow


    【解决方案1】:

    目前无法做到这一点(至少本机无法做到)。 Airflow DAG 接受单个 cron 表达式或 timedelta。如果你不能用其中之一说出所需的调度逻辑,那么你就不能在 Airflow 中进行这种调度。好消息是 Airflow 有 AIP-39 Richer scheduler_interval 来解决这个问题,并在未来的版本中提供更多的调度功能。

    也就是说,您可以通过将 DAG 设置为使用 schedule_interval="@daily" 运行并将 BranchPythonOperator 作为 DAG 的第一个任务来解决此问题。在 Python 可调用文件中,您可以编写所需的调度逻辑,这意味着如果是每月的第 5 个工作日,您的函数将返回 True,否则将返回 False,并且您的工作流将相应地分支。对于 True 它将继续执行,对于 False 它将结束。这并不理想,但它会起作用。一个可能的模板可以是:

    def check_fifth():
        #write the logic of finding if today is the 5th working day of the month
    
        if logic:
            return "continue_task"
        else:
            return "stop_task"
    
    with DAG(dag_id='stackoverflow',
             default_args=default_args,
             schedule_interval="@daily",
             catchup=False
             ) as dag:
    
        start_op = BranchPythonOperator(
            task_id='choose',
            python_callable=check_fifth
        )
    
        stop_op = DummyOperator(
            task_id='stop_task'
        )
    
        #replace with the operator that you want to actually execute
        continue_op = YourOperator(
            task_id='continue_task'
        )
    
        start_op >> [stop_op, continue_op]
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多