【发布时间】:2019-04-25 14:47:41
【问题描述】:
我正在使用airflow.operators.sensors.ExternalTaskSensor 让一个 Dag 等待另一个。
dag = DAG(
'dag2',
default_args={
'owner': 'Me',
'depends_on_past': False,
'start_date': start_datetime,
'email': ['me@example.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=10),
},
template_searchpath="%s/me/resources/" % DAGS_FOLDER,
schedule_interval="{} {} * * *".format(minute, hour),
max_active_runs=1
)
wait_for_dag1 = ExternalTaskSensor(
task_id='wait_for_dag1',
external_dag_id='dag1',
external_task_id='dag1_task1',
dag=dag
)
如果上游 Dag 发生严重错误并且无法在给定时间段内完成,我希望上游 Dag(ExternalTaskSensor 运算符)也崩溃,而不是永远挂起。
如何向 ExternalTaskSensor 添加超时?
我正在查看文档,但它似乎没有 timeout 参数或类似的东西。我该怎么办?
https://airflow.readthedocs.io/en/stable/_modules/airflow/sensors/external_task_sensor.html
【问题讨论】:
-
如果我的问题正确,您可以随时从 metadb 检查 dag 的状态,如果状态失败,您可以引发 AirfloException
标签: airflow