【问题标题】:How do you pass multiple arguments to a Luigi subtask?如何将多个参数传递给 Luigi 子任务?
【发布时间】:2016-08-24 12:13:41
【问题描述】:

我有一个 Luigi 任务,requires 是一个子任务。子任务取决于父任务传递的参数(即执行requireing 的那个)。我知道你可以通过设置来指定子任务可以使用的参数...

def requires(self):
    return subTask(some_parameter)

...然后在子任务上,通过设置接收参数...

x = luigi.Parameter()

这似乎只允许您通过一个参数。通过任意数量的参数(无论我想要什么类型)发送的最佳方式是什么?我真的想要这样的东西:

class parentTask(luigi.Task):

    def requires(self):
        return subTask({'file_postfix': 'foo',
                        'file_content': 'bar'
        })

    def run(self):
        return


class subTask(luigi.Task):
    params = luigi.DictParameter()

    def output(self):
        return luigi.LocalTarget("file_{}.csv".format(self.params['file_postfix']))

    def run(self):
        with self.output().open('w') as f:
            f.write(self.params['file_content'])

如您所见,我尝试使用 luigi.DictParameter 而不是直接的 luigi.Parameter,但是当我运行上述代码时,我从 Luigi 深处的某个地方得到了 TypeError: unhashable type: 'dict'

运行 Python 2.7.11、Luigi 2.1.1

【问题讨论】:

    标签: python luigi


    【解决方案1】:

    通过任意数量的发送的最佳方式是什么 参数,我想要什么类型的?

    最好的方法是使用命名参数,例如,

    #in requires
    return MySampleSubTask(x=local_x, y=local_y)
    
    class MySampleSubTask(luigi.Task):
        x = luigi.Parameter()
        y = luigi.Parameter()
    

    【讨论】:

      【解决方案2】:

      通过任意数量的发送的最佳方式是什么 参数,我想要什么类型的?

      你可以按照这个例子。您将有一个占位符来定义所有需要传递的参数(ParameterCollector)。如果您需要在许多子任务的情况下将参数传递给子任务,这将避免在每个单个任务上定义参数。

      class ParameterCollector(object):
          param1 = luigi.Parameter()
          param2 = luigi.Parameter()
      
          def collect_params(self):
              return {'param1': self.param1, 'param2': self.param2}
      
      
      class TaskB(ParameterCollector, luigi.Task):
          def requires(self):
              return []
      
          def output(self):
              return luigi.LocalTarget('/tmp/task1_success')
      
          def run(self):
              with self.output().open('w') as f:
                  f.write(self.param1)
      
      
      class TaskA(ParameterCollector, luigi.Task):
          def requires(self):
              a = TaskB(**self.collect_params())
              print(a)
              return a
      
          def output(self):
              return luigi.LocalTarget('/tmp/task2_success')
      
          def run(self):
              with self.output().open('w') as f:
                  f.write(str([self.param1, self.param2]))
      
      
      if __name__ == '__main__':
          luigi.run()
      

      【讨论】:

        【解决方案3】:

        好的,所以我发现这在 python 3.5 中可以正常工作(并且问题在 3.4 中仍然存在)。

        今天没有时间深入了解,所以没有更多细节。

        【讨论】:

          【解决方案4】:

          您可以使用以下内容。

          class DownloadFile(luigi.Task):
              """This is file download task used to get
               the file from file parser"""
              id = luigi.Parameter(default=uuid.uuid4().__str__(), positional=True)
              #master_task_id = luigi.Parameter(positional=True)
              pipeline_data = luigi.DictParameter(default={}, significant=False,
                                              
              visibility=luigi.parameter.ParameterVisibility.PRIVATE,
                                              positional=True)
          def output(self):
              # add the file output here
              return MockTarget("FileDownload", mirror_on_stderr=True)
          
          def run(self):
              time.sleep(3)
              parsed_data = self.pipeline_data['pipeline_data'] #this helps in parsing the dict
          

          【讨论】:

            猜你喜欢
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            相关资源
            最近更新 更多