【问题标题】:Apache airflow macro to get last dag run execution timeApache 气流宏获取最后 dag 运行执行时间
【发布时间】:2019-01-26 03:59:56
【问题描述】:

我认为 here 列出的宏 prev_execution_date 可以让我获得最后一次 DAG 运行的执行日期,但查看源代码似乎只能根据 DAG 时间表获得最后一个日期。

prev_execution_date = task.dag.previous_schedule(self.execution_date)

当 DAG 没有按计划运行时,有什么方法可以通过宏获取 DAG 的执行日期?

【问题讨论】:

标签: airflow


【解决方案1】:

是的,您可以为此定义自己的自定义宏,如下所示:

# custom macro function
def get_last_dag_run(dag):
    last_dag_run = dag.get_last_dagrun()
    if last_dag_run is None:
        return "no prev run"
    else:
        return last_dag_run.execution_date.strftime("%Y-%m-%d")

# add macro in user_defined_macros in dag definition
dag = DAG(dag_id="my_test_dag",
      schedule_interval='@daily',
      user_defined_macros={
          'last_dag_run_execution_date': get_last_dag_run
      }
)

# example of using it in practice
print_vals = BashOperator(
    task_id='print_vals',
    bash_command='echo {{ last_dag_run_execution_date(dag) }}',
    dag=dag
)

请注意,dag.get_last_run() 只是 Dag 对象上可用的众多函数之一。这是我找到它的地方:https://github.com/apache/incubator-airflow/blob/v1-10-stable/airflow/models.py#L3396

您还可以调整日期格式的字符串格式,以及如果没有以前的运行,您想要输出的内容。

【讨论】:

  • dag.get_last_dagrun(include_externally_triggered=True) 用于外部触发dags
【解决方案2】:

您可以制作自己的用户自定义宏功能,使用气流模型搜索元数据库。

def get_last_dag_run(dag_id):
  //TODO search DB
  return xxx

dag = DAG(
    'example',
    schedule_interval='0 1 * * *',
    user_defined_macros={
        'last_dag_run_execution_date': get_last_dag_run,
    }
)

然后在模板中使用 KEY。

【讨论】:

  • 这个答案好像只回答了一半,省略了DB搜索。
猜你喜欢
  • 1970-01-01
  • 2017-09-06
  • 2022-11-02
  • 1970-01-01
  • 2021-05-03
  • 1970-01-01
  • 2018-05-06
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多