【问题标题】:Airflow ExternalTaskSensor with different scheduler interval具有不同调度程序间隔的 Airflow ExternalTask​​Sensor
【发布时间】:2021-07-15 22:02:13
【问题描述】:

目前我有两个 DAG:DAG_A 和 DAG_B。两者都运行schedule_interval=timedelta(days=1)

DAG_A 有一个 Task1,通常需要 7 小时才能运行。而 DAG_B 只需要 3 小时。

DAG_B 有一个ExternalTaskSensor(external_dag_id="DAG_A", external_task_id="Task1"),但也使用了一些其他每小时生成的信息 X。

增加 DAG_B 的频率以使其每天至少运行 4 次的最佳方法是什么?据我所知,两个 DAG 必须具有相同的 schedule_interval。但是,我想尽可能多地更新 DAG_B 上的 X。


一种可能性是为 DAG_B 创建另一个具有 ExternalTask​​Sensor 的 DAG。但我认为这不是最好的方法。

【问题讨论】:

    标签: airflow airflow-scheduler


    【解决方案1】:

    如果我理解正确,你的条件是:

    • 每天保持运行DAG_A
    • 每天运行 DAG_B n
    • 每次 DAG_B 运行时,它都会等待 DAG_A__Task_1 完成

    我认为您可以通过指示 ExternalTaskSensor 等待 DAG_A 的期望执行日期来轻松调整当前设计。

    来自ExternalTaskSensor 运算符定义:

    等待不同的 DAG 或不同 DAG 中的任务在特定的 execution_date 完成

    execution_date 可以使用execution_date_fn 参数定义:

    execution_date_fn (Optional[Callable]) – 接收当前执行日期作为第一个位置参数和可选的上下文字典中可用的任意数量的关键字参数的函数,并返回所需的执行日期以进行查询。 execution_delta 或 execution_date_fn 可以传递给 ExternalTask​​Sensor,但不能同时传递。

    您可以这样定义传感器:

        wait_for_dag_a = ExternalTaskSensor(
            task_id='wait_for_dag_a',
            external_task_id="external_task_1",
            external_dag_id='dag_a_id',
            allowed_states=['success', 'failed'],
            execution_date_fn=_get_execution_date_of_dag_a,
            poke_interval=30
        )
    

    _get_execution_date_of_dag_a 使用 get_last_dagrun 对 DB 执行查询,允许您获取 DAG_A 的最后一个 execution_date

    from airflow.utils.db import provide_session
    from airflow.models.dag import get_last_dagrun
    
    @provide_session
    def _get_execution_date_of_dag_a(exec_date, session=None,  **kwargs):
        dag_a_last_run = get_last_dagrun(
            'dag_a_id', session)
        return dag_a_last_run.execution_date
    

    我希望这种方法对您有所帮助。您可以在 this answer 中找到一个工作示例。

    【讨论】:

    • 您知道是否可以更改 _get_execution_date_of_dag_a 函数,以允许参数动态获取不同 dag_id 的最后一次运行?
    • 我想通了! < @provide_session def get_last_run_lambda(exteral_dag_id, session): dag_last_run = get_last_dagrun(exteral_dag_id, session) last_exec_date = dag_last_run.execution_date return lambda x: last_exec_date >
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-06-21
    • 2019-06-05
    • 1970-01-01
    • 2022-11-08
    • 1970-01-01
    相关资源
    最近更新 更多