【问题标题】:Re-using generic tasks in Luigi在 Luigi 中重用通用任务
【发布时间】:2018-03-09 15:44:51
【问题描述】:

我无法理解如何在 Luigi 中创建可重复使用的任务,然后在具体情况下使用它们。

例如。我有两个通用任务对文件执行某些操作,然后输出结果:

class GffFilter(luigi.Task):
    "Filters a GFF file to only one feature"
    feature = luigi.Parameter()
    out_file = luigi.Parameter()
    in_file = luigi.Parameter()
    ...

class BgZip(luigi.Task):
    "bgZips a file"
    out_file = luigi.Parameter()
    in_file = luigi.Parameter()
    ...

现在,我想要一个首先过滤的工作流,然后使用这些任务对特定文件进行 bgzip 压缩:

class FilterSomeFile(luigi.WrapperTask):
    def requires(self):
        return GffFilter(in_file='some.gff3', out_file='some.genes.gff3', filter='gene')

    def output(self):
        return self.inputs()

class BgZipSomeFile(luigi.Task):
    def run(self):
        filtered = FilterSomeFile()
        BzZip(filtered)

但这很尴尬。在第一个任务中,我没有 run 方法,我只是使用依赖项来使用通用任务。它是否正确?我应该在这里使用继承吗?

然后在第二个任务中,我不能使用依赖项,因为我需要FilterSomeFile 的输出才能使用BgZip。但是使用动态依赖似乎是错误的,因为 luigi 无法构建正确的依赖图。

应该如何从我的通用任务中创建一个 Luigi 工作流程?

【问题讨论】:

    标签: python luigi


    【解决方案1】:

    但这很尴尬。在第一个任务中,我没有运行方法,我只是使用依赖项来使用通用任务。这是正确的吗?

    是的,根据this pageWrapperTask 是一个虚拟任务,其目的是定义任务的工作流,因此它本身不执行任何操作。相反,通过定义多个需求,当requires 方法中列出的每个需求都已完成时,此任务将完成。此WrapperTask 与常规Task 的主要区别在于您不需要定义输出方法来表示此任务已成功,如here 所示。

    然后在第二个任务中,我不能使用依赖项,因为我需要 FilterSomeFile 的输出才能使用 BgZip。但是使用动态依赖似乎是错误的,因为 luigi 无法构建正确的依赖图。

    理论上,您可以使FilterSomeFileGffFilter 具有相同的输出,使BgZipSomeFile 需要FilterSomeFile,然后使用BgZipSomeFile.run 中的FilterSomeFile.output() 访问压缩文件。但是,这个解决方案会有些奇怪,因为:

    • 包装任务仅“运行”1 个其他任务,因此可以直接使用包装任务,而无需创建WrapperTask。更好地使用WrapperTask 需要将BgZipSomeFileFilterSomeFile 合并到WrapperTask 的单个子类中

    • Task 正在运行方法中实例化。这会导致动态依赖,但在此问题中不需要这样做。

    • 最后,GffFilter 的输入在FilterSomeFile 任务中被硬编码,这使得工作流变得不那么有用。这可以通过使WrapperClass 仍然接收参数并将这些参数传递给它的要求来避免。

    更好的解决方案是:

    import luigi as lg
    
    class A(lg.Task):
        inFile = lg.Parameter()
        outFile = lg.Parameter()
    
        def run(self,):
            with open(self.inFile, "r") as oldFile:
                text = oldFile.read()
    
            text  += "*" * 10 + "\n" + "This text was added by task A.\n" + "*" * 10 + "\n"
            print(text)
            with open(self.outFile, "w") as newFile:
                newFile.write(text)
    
        def output(self,):
            return lg.LocalTarget(self.outFile)
    
    class B(lg.Task):
        inFile = lg.Parameter()
        outFile = lg.Parameter()
    
        def run(self,):
            with open(self.inFile, "r") as oldFile:
                text = oldFile.read()
    
            text  += "*" * 10 + "\n" + "This text was added by task B.\n" + "*" * 10 + "\n"
    
            with open(self.outFile, "w") as newFile:
                newFile.write(text)
    
        def output(self,):
            return lg.LocalTarget(self.outFile)
    
    class CustomWorkflow(lg.WrapperTask):
        mainOutFile = lg.Parameter()
        mainInFile = lg.Parameter()
        tempFile = "/tmp/myTempFile.txt"
        def requires(self,):
            return [    A(inFile = self.mainInFile, outFile = self.tempFile),
                        B(inFile = self.tempFile, outFile = self.mainOutFile)
                    ]
    

    此代码可以在命令行中运行:

    PYTHONPATH='.' luigi --module pythonModuleContainingTheTasks --local-scheduler CustomWorkflow --mainInFile ./text.txt --mainOutFile ./procText.txt
    

    【讨论】:

    • 任务 B 失败,因为 myTempFile.txt 不存在。任务 A 和任务 B 之间似乎存在竞争条件。谢谢,因为我正在寻找对此的确认。
    猜你喜欢
    • 2019-01-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-03-11
    • 1970-01-01
    相关资源
    最近更新 更多