【问题标题】:Can we configure different schedule_interval for different tasks within a DAG?我们可以为 DAG 中的不同任务配置不同的 schedule_interval 吗?
【发布时间】:2021-05-24 05:14:28
【问题描述】:

我们可以为同一个 DAG 中的不同任务设置不同的 schedule_intervals 吗?

即我有一个包含三个任务的 DAG,A >> B >> C。我希望上游任务 A 和 B 每周运行,但对于下游任务 C,我希望它每天运行。是否可以?如果是,DAG 和任务的 schedule_interval 应该是多少?

【问题讨论】:

    标签: airflow airflow-scheduler


    【解决方案1】:

    您可以使用两个选项ShortCircuitOperatorBranchDayOfWeekOperator

    1 在该用例中使用BranchDayOfWeekOperator。该运算符根据一周中的特定日期进行分支:

    with DAG('my_dag',
         schedule_interval='@daily'
         ) as dag:
        task1 = DummyOperator(task_id='TASK1')
        task2 = DummyOperator(task_id='TASK2')
        task3 = DummyOperator(task_id='TASK3')
        end_task = DummyOperator(task_id='end_task')
        branch = BranchDayOfWeekOperator(
            task_id="make_choice",
            follow_task_ids_if_true="TASK3",
            follow_task_ids_if_false="end_task",
            week_day="Monday",
        )
        task1 >> task2 >> branch >> [task3, end_task]
    

    在此示例中,task3 将仅在星期一执行,而 task1task2 将每天执行。

    请注意,此运算符仅适用于 Airflow >=2.1.0,但是您可以复制运算符源代码并创建本地版本。

    2 使用ShortCircuitOperator

    from datetime import date
    def func():
        if date.today().weekday() == 0:
            return True
        return False
    
    with DAG('my_dag',
         schedule_interval='@daily'
         ) as dag:
        task1 = DummyOperator(task_id='TASK1')
        task2 = DummyOperator(task_id='TASK2')
        task3 = DummyOperator(task_id='TASK3')
        verify = ShortCircuitOperator(task_id='check_day', python_callable=func)
        task1 >> task2 >> verify >> task3
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-10-16
      • 2018-11-28
      • 2017-04-29
      相关资源
      最近更新 更多