【发布时间】:2019-09-17 02:12:05
【问题描述】:
我正在尝试在不同 DAG(我们将其称为 DAG A)中的任务上创建一个外部传感器(在 DAG B 中),该传感器按以下时间间隔运行:'schedule_interval': '0 4,6,8,10 ,12,14,16,18,20,22 * * *'。 DAG B 计划在每天凌晨 2 点运行。我想在 DAG B 中创建一个传感器任务,检查 DAG A 中外部任务的凌晨 4 点运行是否成功
我无法重新安排我的 DAG B 在 4 点运行,因为 DAG B 中有其他任务需要在 2 点运行。我尝试更改 window_size 和 window_offset 参数,但它不起作用。
ExternalTaskSensor 方法已被如下覆盖
from airflow.models import TaskInstance, DagRun
def return_start_end_time(self, context):
execution_date = context.get('next_execution_date')
return (execution_date - self.window_offset - self.window_size,
execution_date - self.window_offset)
def poke(self, context):
start_date, end_date = self.return_start_end_time(context)
expected_executions = date_range(start_date, end_date,
delta=self.dep_dag_schedule)
TI = TaskInstance
DR = DagRun
executions = (
session.query(TI.dag_id, TI.task_id, TI.execution_date,
TI.state)
.join(DR, and_(DR.dag_id == TI.dag_id,
DR.execution_date == TI.execution_date))
.filter(TI.dag_id == self.external_dag_id,
TI.task_id == self.external_task_id,
TI.execution_date.in_(expected_executions),
DR.run_id.startswith('scheduled__'))
.order_by(TI.execution_date.desc()).all()
)
Task Sensor的代码如下:
wait_task = CustomTaskSensor(
task_id=wait_task,
poke_interval=60,
dag=dag,
external_dag_id=DAGA,
external_task_id=TaskA,
window_size=timedelta(days=0, hours=5),
window_offset=timedelta(days=0,hours=-5),
execution_timeout=timedelta(hours=5),
success_fn=MOST_RECENT_SUCCESS
)
【问题讨论】: