【问题标题】:Airflow ExternalTaskSensor gets stuckAirflow ExternalTask​​Sensor 卡住了
【发布时间】:2017-10-18 09:48:09
【问题描述】:

我正在尝试使用 ExternalTask​​Sensor,但它在戳另一个 DAG 的任务时卡住了,该任务已成功完成。

在这里,第一个 DAG“a”完成了它的任务,然后应该触发通过 ExternalTask​​Sensor 的第二个 DAG“b”。相反,它被困在寻找 a.first_task 上。

第一个 DAG:

import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

dag = DAG(
    dag_id='a',
    default_args={'owner': 'airflow', 'start_date': datetime.datetime.now()},
    schedule_interval=None
)

def do_first_task():
    print('First task is done')

PythonOperator(
    task_id='first_task',
    python_callable=do_first_task,
    dag=dag)

第二个 DAG:

import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.sensors import ExternalTaskSensor

dag = DAG(
    dag_id='b',
    default_args={'owner': 'airflow', 'start_date': datetime.datetime.now()},
    schedule_interval=None
)

def do_second_task():
    print('Second task is done')

ExternalTaskSensor(
    task_id='wait_for_the_first_task_to_be_completed',
    external_dag_id='a',
    external_task_id='first_task',
    dag=dag) >> \
PythonOperator(
    task_id='second_task',
    python_callable=do_second_task,
    dag=dag)

我在这里缺少什么?

【问题讨论】:

    标签: python airflow


    【解决方案1】:

    ExternalTaskSensor 假设您依赖于具有相同执行日期的 dag 运行中的任务。

    这意味着在您的情况下,dag ab 需要按相同的时间表运行(例如每天上午 9:00 或 w/e)。

    否则在实例化ExternalTaskSensor时需要使用execution_deltaexecution_date_fn

    这里是运营商内部的文档,以帮助进一步澄清:

    :param execution_delta: time difference with the previous execution to
        look at, the default is the same execution_date as the current task.
        For yesterday, use [positive!] datetime.timedelta(days=1). Either
        execution_delta or execution_date_fn can be passed to
        ExternalTaskSensor, but not both.
    
    :type execution_delta: datetime.timedelta
    
    
    :param execution_date_fn: function that receives the current execution date
        and returns the desired execution date to query. Either execution_delta
        or execution_date_fn can be passed to ExternalTaskSensor, but not both.
    
    :type execution_date_fn: callable
    

    【讨论】:

    • 所以如果ab 确实按相同的时间表运行,我不需要通过execution_deltaexecution_date_fn,对吗?
    • @JoshHerzberg 我很确定这是正确的,但我已经有一段时间没有使用过这个传感器了。
    • 你知道如何监控日程设置为无的 Dag 吗?
    【解决方案2】:

    为了澄清我在此处和其他相关问题上看到的内容,dag 不一定必须按照已接受的答案中所述的相同时间表运行。 dag 也不需要具有相同的start_date。如果您创建的ExternalTaskSensor 任务没有execution_deltaexecution_date_fn,那么这两个dag 需要具有相同的执行日期。碰巧的是,如果两个 dag 具有相同的计划,则每个间隔中的计划运行将具有相同的执行日期。我不确定手动触发的计划 dag 运行的执行日期是什么。

    要使这个示例正常工作,dag bExternalTaskSensor 任务需要一个 execution_deltaexecution_date_fn 参数。如果使用execution_delta 参数,则应该是b 的执行日期-execution_delta = a 的执行日期。如果使用execution_date_fn,则该函数应返回a 的执行日期。

    如果您使用TriggerDagRunOperator,然后使用ExternalTaskSensor 来检测该dag 何时完成,您可以执行类似的操作,例如使用TriggerDagRunOperator 的@987654338 将主dag 的执行日期传递给触发日期@ 参数,如execution_date='{{ execution_date }}'。那么两个 dag 的执行日期将是相同的,并且您不需要每个 dag 的计划相同,也不需要使用 execution_deltaexecution_date_fn 传感器参数。

    以上是在 Airflow 1.10.9 上编写和测试的

    【讨论】:

      【解决方案3】:

      从 Airflow v1.10.7 开始,tomcm 的回答不正确(至少对于这个版本)。如果他们没有相同的时间表,则应使用execution_deltaexecution_date_fn 来确定外部 DAG 的日期和时间表。

      【讨论】:

        【解决方案4】:

        来自我的成功案例:

        default_args = {
            'owner': 'xx',
            'retries': 2,
            'email': ALERT_EMAIL_ADDRESSES,
            'email_on_failure': True,
            'email_on_retry': False,
            'retry_delay': timedelta(seconds=30),
            # avoid stopping tasks after one day
            'depends_on_past': False,
        }
        
        dag = DAG(
            dag_id = dag_id,
            # get the datetime type value
            start_date = pendulum.strptime(current_date, "%Y, %m, %d, %H").astimezone('Europe/London').subtract(hours=1),
            description = 'xxx',
            default_args = default_args,
            schedule_interval = timedelta(hours=1),
            )
        
        ...
            external_sensor= ExternalTaskSensor(
                    task_id='ext_sensor_task_update_model',
                    external_dag_id='xxx',
                    external_task_id='xxx'.format(log_type),
                    # set the task_id to None because of the end_task
                    # external_task_id = None,
                    dag=dag,
                    timeout = 300,
                    )
        ...
        

        您可以等到任务自动触发成功。不要手动操作,start_date会不一样。

        【讨论】:

          【解决方案5】:

          默认情况下,Airflow 会查找相同的执行日期、时间戳。如果我们使用 execution_date_fn 参数,我们必须返回要查找的时间戳值列表。在内部,传感器将查询气流的 task_instance 表,以检查 dag 运行的 dagid、taskid、状态和执行日期时间戳作为参数提供。因此,如果我们使用 None 计划,则必须手动触发 dag,在这种情况下,日期时间戳可能是任何可能的值。 我在这里详细解释过: https://link.medium.com/QzXm21asokb

          我创建了一个继承 ExternalTask​​Sensor 的新传感器,它可用于监控具有 None 计划的 dag。您可以在以下 repo 中找到代码。 https://github.com/Deepaksai1919/AirflowTaskSensor

          【讨论】:

            猜你喜欢
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 2018-06-21
            • 2019-06-05
            • 1970-01-01
            • 1970-01-01
            相关资源
            最近更新 更多