【问题标题】:Run taskA and run next tasks with parameters, that returned taskA in luigi运行 taskA 并使用参数运行下一个任务,在 luigi 中返回 taskA
【发布时间】:2018-01-24 08:44:13
【问题描述】:

我有一个任务,它会生成应该处理的文件:

class TaskA(luigi.Task):
    def run(self):
        # some code which generates list of files into output()
    def output(self):
        return luigi.LocalTarget(filepath='/path/to/process_these_files.json')

我有包装任务,它应该运行 TaskA,获取参数,并使用我放入 process_these_files.json 的值运行处理任务

class RunAll(luigi.WrapperTask):
    def requires(self):
        files = json.load(TaskA().open('r'))
        for file in files:
            yield ProcessFileTask(file=file)

有什么办法吗?

【问题讨论】:

    标签: python pipeline luigi


    【解决方案1】:

    您可以使用动态依赖项。这些是在运行时知道的依赖项。每次您 yield 动态依赖时,run() 方法将一直保持到依赖完成为止。

    例如:

    class RunAll(luigi.WrapperTask): 
        def requires(self): 
            return TaskA() 
    
        def run(self):
            files = json.load(self.input().open('r')) 
            for file in files: 
                yield ProcessFileTask(file=file)
    

    另见https://luigi.readthedocs.io/en/stable/tasks.html#dynamic-dependencies

    【讨论】:

    • 我已经尝试过这种方法,但是如果我使用 2 个以上的工作人员,即使完成了来自动态依赖项的任务,也会出现无限循环问题
    猜你喜欢
    • 2017-03-10
    • 2020-01-02
    • 1970-01-01
    • 1970-01-01
    • 2020-08-10
    • 2012-06-28
    • 2014-06-21
    • 1970-01-01
    • 2021-04-24
    相关资源
    最近更新 更多