【发布时间】:2018-03-09 15:44:51
【问题描述】:
我无法理解如何在 Luigi 中创建可重复使用的任务,然后在具体情况下使用它们。
例如。我有两个通用任务对文件执行某些操作,然后输出结果:
class GffFilter(luigi.Task):
"Filters a GFF file to only one feature"
feature = luigi.Parameter()
out_file = luigi.Parameter()
in_file = luigi.Parameter()
...
class BgZip(luigi.Task):
"bgZips a file"
out_file = luigi.Parameter()
in_file = luigi.Parameter()
...
现在,我想要一个首先过滤的工作流,然后使用这些任务对特定文件进行 bgzip 压缩:
class FilterSomeFile(luigi.WrapperTask):
def requires(self):
return GffFilter(in_file='some.gff3', out_file='some.genes.gff3', filter='gene')
def output(self):
return self.inputs()
class BgZipSomeFile(luigi.Task):
def run(self):
filtered = FilterSomeFile()
BzZip(filtered)
但这很尴尬。在第一个任务中,我没有 run 方法,我只是使用依赖项来使用通用任务。它是否正确?我应该在这里使用继承吗?
然后在第二个任务中,我不能使用依赖项,因为我需要FilterSomeFile 的输出才能使用BgZip。但是使用动态依赖似乎是错误的,因为 luigi 无法构建正确的依赖图。
应该如何从我的通用任务中创建一个 Luigi 工作流程?
【问题讨论】: