【发布时间】:2021-04-24 11:26:09
【问题描述】:
我正在努力解决我不理解的 Luigi 错误。我不知道这是一个已知问题、Luigi 的限制还是我做错了什么。
我在一个涉及许多任务和许多依赖项的实际问题中使用 Luigi。但是,我做了一个玩具示例,其中清楚地显示了此问题。
让我们考虑两个任务,TaskA 和 TaskB,TaskA 需要执行两个先前具有不同 Luigi 参数值的 TaskB 实例。
如果我在 TaksA 的 requires() 方法中编写依赖关系,则不会发生任何不好的事情。所有三个任务都执行了,我得到了我的退出文件。
但如果我在 TaksA 的 run() 方法中编写依赖关系,那么我会得到丑陋的 TaskClassAmbigiousException。
在我的实际问题中,我无法在 requires() 方法中产生任务,因为我需要知道在 quieres() 方法中也产生的前一个任务的结果,所以我尝试在run() 得到了同样的异常。
好的,这是玩具示例的代码。首先,在 requieres() 中生成任务,它可以工作:
import luigi
class TaskB(luigi.Task):
j = luigi.IntParameter(default=1)
def output(self):
return luigi.LocalTarget("data/outputB{j}.txt".format(j=self.j))
def requires(self):
pass
def run(self):
print_file = 'TaskB' + str(self.j)
with self.output().open('w') as out_file:
out_file.write(print_file)
class TaskA(luigi.Task):
i = luigi.IntParameter(default=1)
def output(self):
return luigi.LocalTarget("data/outputA{i}.txt".format(i=self.i))
def requires(self):
yield TaskB(j=self.i)
yield TaskB(j=self.i+1)
def run(self):
print_file = ""
for input_target in self.input():
with input_target.open('r') as in_file:
for line in in_file:
print_file+=line + 'TaskA' + str(self.i)
with self.output().open('w') as out_file:
out_file.write(print_file)
if __name__ == '__main__':
taskA = TaskA(i=2)
其次,在 run() 中产生任务,我得到了这个:
File "/home/ppo0011l/.conda/envs/nudge/lib/python3.6/site-packages/luigi/worker.py", line 1081, in _handle_next_task
for module, name, params in new_requirements]
File "/home/ppo0011l/.conda/envs/nudge/lib/python3.6/site-packages/luigi/worker.py", line 1081, in <listcomp>
for module, name, params in new_requirements]
File "/home/ppo0011l/.conda/envs/nudge/lib/python3.6/site-packages/luigi/task_register.py", line 251, in load_task
task_cls = Register.get_task_cls(task_name)
File "/home/ppo0011l/.conda/envs/nudge/lib/python3.6/site-packages/luigi/task_register.py", line 181, in get_task_cls
raise TaskClassAmbigiousException('Task %r is ambiguous' % name)
代码:
import luigi
class TaskB(luigi.Task):
j = luigi.IntParameter(default=1)
def output(self):
return luigi.LocalTarget("data/outputB{j}.txt".format(j=self.j))
def requires(self):
pass
def run(self):
print_file = 'TaskB' + str(self.j)
with self.output().open('w') as out_file:
out_file.write(print_file)
class TaskA(luigi.Task):
i = luigi.IntParameter(default=1)
def output(self):
return luigi.LocalTarget("data/outputA{i}.txt".format(i=self.i))
def requires(self):
pass
def run(self):
print_file = ""
target1 = yield TaskB(j=self.i)
target2 = yield TaskB(j=self.i+1)
for input_target in [target1, target2]:
with input_target.open('r') as in_file:
for line in in_file:
print_file+=line + 'TaskA' + str(self.i)
with self.output().open('w') as out_file:
out_file.write(print_file)
if __name__ == '__main__':
taskA = TaskA(i=2)
luigi.build([taskA], workers=1,local_scheduler=True,log_level='WARNING')
编辑:我编辑添加另一个相关问题。因为我想要做的是产生一个任务,其参数取决于先前产生的任务,如果这对我来说可能就足够了:
def requires(self):
taskb_target = yield TaskB(j=self.i)
taskb_target.open('r')
# do something and yield next Task depending on what taskb_target has
yield TaskB(j=self.i+1)
但不幸的是,这不起作用。 Luigi 说“NoneType”对象没有“open”属性。
但是,当您在 run() 方法中生成任务时,您可以在运行时访问输出。好像有很大的不对称……
第二次编辑:
我做了更多的试验,发现了一个奇怪的结论:我在原始问题中编写的第二段代码(在 .py 文件中时)可以永远执行,即使删除输出文件和所以迫使 luigi 重新执行任务。但是,第一段代码只能执行一次(然后,在第一次执行时,它可以工作!!)。但是如果你删除文件并再次执行代码,你会得到模棱两可的任务错误。
我认为这与 luigi 的 Register 对象有关。但真正让我感到困惑的是,无论我是在 requieres 还是在 run 方法中生成 taskB,这种行为都是不同的。
我仍然不知道在重新定义已经在 luigi 的注册模块中的类任务时是否会出现问题。可能是...我还尝试将类定义放在与主 .py 不同的 .py 中,但是在运行两次时它会中断。只有重启内核才能正常运行,你只有一次机会!
【问题讨论】: