【问题标题】:Create an Airflow ExternalTaskSensor for a specific run of an external Task that runs multiple times in a day为一天内运行多次的外部任务的特定运行创建 Airflow ExternalTask​​Sensor
【发布时间】:2019-09-17 02:12:05
【问题描述】:

我正在尝试在不同 DAG(我们将其称为 DAG A)中的任务上创建一个外部传感器(在 DAG B 中),该传感器按以下时间间隔运行:'schedule_interval': '0 4,6,8,10 ,12,14,16,18,20,22 * * *'。 DAG B 计划在每天凌晨 2 点运行。我想在 DAG B 中创建一个传感器任务,检查 DAG A 中外部任务的凌晨 4 点运行是否成功

我无法重新安排我的 DAG B 在 4 点运行,因为 DAG B 中有其他任务需要在 2 点运行。我尝试更改 window_size 和 window_offset 参数,但它不起作用。

ExternalTask​​Sensor 方法已被如下覆盖

from airflow.models import TaskInstance, DagRun

def return_start_end_time(self, context):
  execution_date = context.get('next_execution_date')
  return (execution_date - self.window_offset - self.window_size,
                execution_date - self.window_offset)

def poke(self, context):
  start_date, end_date = self.return_start_end_time(context)
  expected_executions = date_range(start_date, end_date,
                                         delta=self.dep_dag_schedule)
  TI = TaskInstance
  DR = DagRun

  executions = (
            session.query(TI.dag_id, TI.task_id, TI.execution_date, 
  TI.state)
                .join(DR, and_(DR.dag_id == TI.dag_id, 
  DR.execution_date == TI.execution_date))
                .filter(TI.dag_id == self.external_dag_id,
                        TI.task_id == self.external_task_id,
                        TI.execution_date.in_(expected_executions),
                        DR.run_id.startswith('scheduled__'))
                .order_by(TI.execution_date.desc()).all()
   )

Task Sensor的代码如下:

wait_task = CustomTaskSensor(
            task_id=wait_task,
            poke_interval=60,
            dag=dag,
            external_dag_id=DAGA,
            external_task_id=TaskA,
            window_size=timedelta(days=0, hours=5),
            window_offset=timedelta(days=0,hours=-5),
            execution_timeout=timedelta(hours=5),
            success_fn=MOST_RECENT_SUCCESS
        )

【问题讨论】:

    标签: airflow airflow-scheduler


    【解决方案1】:
    :param execution_date_fn: function that receives the current execution date
        and returns the desired execution dates to query. Either execution_delta
        or execution_date_fn can be passed to ExternalTaskSensor, but not both. 
    :type execution_date_fn: callable
    
    ..
    elif self.execution_date_fn:
        dttm = self.execution_date_fn(context['execution_date'])
    ..
    
    dttm_filter = dttm if isinstance(dttm, list) else [dttm]
    serialized_dttm_filter = ','.join(
        [datetime.isoformat() for datetime in dttm_filter])
    

    【讨论】:

      【解决方案2】:

      我们有自己的自定义 ExternalTask​​Sensor 类,它没有 execution_date_fn 和 execution_delta 参数。相反,我们有诸如 window_size 和 window_offset 之类的参数。方法的实现方式,Task Sensor 的执行日期与实际执行日期相同(不是默认值)。这意味着如果包含 TaskSensor 的 DAG 在 9/17 凌晨 2 点触发,则传感器的执行日期设置为 9/17 凌晨 2 点。但是,外部任务的执行日期设置为之前的执行日期(这是默认的 Lakitu 行为),即如果外部任务在 9/17 凌晨 4 点运行,则执行日期设置为 9/16 晚上 10 点(这是之前的执行日期)。我必须定义我的 window_size 和 window_offset 参数,以便外部任务的执行日期落在使用 return_start_end_time 函数计算的窗口内(其中执行日期是指 TaskSensor 的执行日期)。

      【讨论】:

      • 如果上游 DAG 没有时间表,这会起作用吗?我注意到您有“expected_executions = date_range(start_date, end_date, delta=self.dep_dag_schedule)”这一行,但我的上游 DAG 可能随时被触发。我猜我必须更改 execution_date.in_ 部分来检查执行日期是否在一个范围内而不是一个列表内。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-12-26
      • 2021-06-19
      • 2012-01-26
      • 2020-06-21
      • 1970-01-01
      • 2020-11-23
      相关资源
      最近更新 更多