【问题标题】:Airflow tells me dag has run successfully while it has finished its tasksAirflow 告诉我 dag 在完成任务时已成功运行
【发布时间】:2018-10-15 09:57:54
【问题描述】:

我正在使用气流来编排一个 etl 进程,并且我已经构建了以下任务:

for sql_file in sql_files:
  t1 = PythonOperator(
    task_id= sql_file + '_run',
    provide_context=True,
    python_callable=run_sql_from_file,
    op_kwargs={'filename': sql_file + '.sql',
               'connection': connection,
               'logger': logger
               },
    trigger_rule='all_done',
    dag=dag)

所有 sql 文件都具有相同的结构,并且是此脚本的变体:

    delete from databaseY.tableX;

    insert into databaseY.tableX
    select *
    from     databaseZ.tableW as bs
    inner join databaseW.tableY as cd on bs.id_camp = cd.id_camp

首先删除数据表的记录,然后插入新记录。

当我从气流 dag 运行此任务时,我没有收到任何错误,实际上气流告诉我所有任务都已成功运行。不幸的是,当 dag 完成时,一些数据表是空的。我认为这是由于任务从数据表中删除数据,但从未完成插入新数据的事实引起的。我认为气流超时,但我不知道在哪里可以确保不会发生这种情况。

问题:我该如何解决这个问题?

【问题讨论】:

    标签: python-3.x airflow


    【解决方案1】:

    您是否将这个 python 函数作为 async 或 sync 运行或检查 run_sql_from_file 中的 db 操作状态?如果你以异步方式运行它,它可能不会返回状态。在这种情况下,我总是写运算符来检查数据库的运行状态。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2022-11-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-02-04
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多