【发布时间】:2021-05-16 09:39:39
【问题描述】:
import luigi
class FileToStaging(ImportToTable):
filename = Luigi.Parameter(default = '')
#import file from some folder to a staging database
def requires(self):
return luigi.LocalTarget(self.filename)
#truncate table
#load the file into staging
class StgToOfficial(RunQuery):
filename = Luigi.Parameter
# run a process in the database to load data from staging to the final table
def requires(self):
return FileToStaging(self.filename)
# run query
class LoadFileGroups(luigi.WrapperTask):
def requires(self):
list_of_files = get_list_of_files_currently_in_folder() # The folder can have an arbitrary number of files inside
for file in list_of_files:
yield(StgToOfficial(filename = file))
大家好,
我是 Luigi 的新手,正在尝试使用该框架构建 ETL 流程。
想象一下我有一个类似于前面sn-p的伪代码的过程。该过程必须检查文件夹并获取其中的文件列表。然后,一一导入staging数据库,运行一个流程,将staging中的数据加载到最终表中。
问题在于,使用前面的解决方案,所有加载到临时表的文件(随后是每个文件的加载过程)都是并行运行的,这是不可能发生的。如何强制 Luigi 按顺序执行任务?只有当一个文件完成最终表中的加载时,才导入下一个文件,依此类推。 (查看下面的草稿以获得简化的草稿)
Draft of the structure I'm trying to achieve
我知道我应该使用 requires 方法来确保顺序,但是对于要加载的未知数量的文件,我该如何动态地做到这一点?
非常感谢您的帮助。
【问题讨论】:
标签: python dynamic etl sequential luigi