【问题标题】:Ensure sequential running of tasks (Apache Airflow)确保任务的顺序运行(Apache Airflow)
【发布时间】:2017-06-14 21:50:45
【问题描述】:

在顺序执行器下,我有一个 DAG 文件,我在其中指定了三个需要顺序运行的任务(t1-->t2-->t3):

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2017, 6, 14, 23 , 20),
    'email_on_failure': False,
    'email_on_retry': False,
    }

dag = DAG('test_dag', default_args=default_args, schedule_interval="*/5 * * * *")

t1 = BashOperator(
    task_id='form_dataset',
    bash_command='python 1.py',
    dag=dag)

t2 = BashOperator(
    task_id='form_features',
    bash_command='python 2.py',
    depends_on_past=True,
    dag=dag)

t3 = BashOperator(
    task_id='train',
    bash_command='python 3.py',
    depends_on_past=True,
    dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t2)
t4.set_upstream(t3)

我假设顺序行为 t1-->t2-->t3 是默认行为,但认为在我的情况下并非如此(顺序几乎是随机的,例如 t1-->t2-->t2- ->t1-->t3)。我缺少什么样的论点可以纠正这种行为?

【问题讨论】:

    标签: airflow apache-airflow airflow-scheduler


    【解决方案1】:

    你需要添加语句

    t1 >> t2 >> t3
    

    在文件末尾。有关这方面的更多详细信息,请访问以下链接: https://airflow.incubator.apache.org/concepts.html#bitshift-composition

    为了完整起见,您也可以使用任务的 set_upstream() 或 set_downstream() 方法来完成。

    【讨论】:

    • 感谢您的回复,我其实有(抱歉我没有把它放在代码中)。我想问题是所有三个任务的总运行时间都大于调度间隔。我想设置一个 DAG 中所有任务的执行优先级,而不是运行一个新的 DAG,但到目前为止还没有找到相关的属性。
    • 每次 DAG 运行的执行是否依赖于上一次运行的成功完成?在这种情况下,您可以查看参数depends_on_true 的行为。还有另一个选项可以将每个任务分配给一个池并将该池的大小限制为 1。或者我可能误解了您的用例?一般来说,dag 任务应该尽可能独立于其他运行。
    • 在airflow.cfg 中有一个设置max_active_runs_per_dag。将此设置为 1 还应该防止相同的 dag 在前一个完成之前再次开始。
    • 非常感谢 - 'max_active_runs_per_dag' 正是我们所需要的!祝你在 ETH 学习顺利(不久前在那里毕业)
    猜你喜欢
    • 1970-01-01
    • 2020-06-21
    • 1970-01-01
    • 2019-08-12
    • 1970-01-01
    • 2011-11-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多