【发布时间】:2018-10-15 09:57:54
【问题描述】:
我正在使用气流来编排一个 etl 进程,并且我已经构建了以下任务:
for sql_file in sql_files:
t1 = PythonOperator(
task_id= sql_file + '_run',
provide_context=True,
python_callable=run_sql_from_file,
op_kwargs={'filename': sql_file + '.sql',
'connection': connection,
'logger': logger
},
trigger_rule='all_done',
dag=dag)
所有 sql 文件都具有相同的结构,并且是此脚本的变体:
delete from databaseY.tableX;
insert into databaseY.tableX
select *
from databaseZ.tableW as bs
inner join databaseW.tableY as cd on bs.id_camp = cd.id_camp
首先删除数据表的记录,然后插入新记录。
当我从气流 dag 运行此任务时,我没有收到任何错误,实际上气流告诉我所有任务都已成功运行。不幸的是,当 dag 完成时,一些数据表是空的。我认为这是由于任务从数据表中删除数据,但从未完成插入新数据的事实引起的。我认为气流超时,但我不知道在哪里可以确保不会发生这种情况。
问题:我该如何解决这个问题?
【问题讨论】:
标签: python-3.x airflow