【问题标题】:Architecture for luigi tasks with multiple inputs具有多个输入的 luigi 任务架构
【发布时间】:2016-11-28 11:48:15
【问题描述】:

我有一些泡菜文件,一个用于 2005 年到 2010 年之间的每个日期。每个文件都包含一个单词字典,其中包含该日期各自的频率。我还有一个“主文件”,其中包含整个期间的所有唯一单词。总共大约有 500 万字。

我需要获取所有数据并为每个单词生成一个 CSV 文件,每个日期将有一行。例如文件some_word.txt:

2005-01-01,0.0003
2005-01-02,0.00034
2005-01-03,0.008

我在使用 luigi 框架组织这个过程时遇到了麻烦。我当前的顶级任务需要一个单词,查找每个日期的相关频率并将结果存储在 CSV 文件中。我想我可以遍历我的主文件中的每个单词并使用该单词运行任务,但我估计这需要几个月甚至更长的时间。这是我的顶级AggregateTokenFreqs 任务的简化版本。

class AggregateTokenFreqs(luigi.Task):
    word = luigi.Parameter()

    def requires(self):
        pass  # not sure what to require here, master file?

    def output(self):
        return luigi.LocalTarget('data/{}.csv'.format(self.word))

    def run(self):
        results = []
        for date_ in some_list_of_dates:
            with open('pickles/{}.p'.format(date_), 'rb') as f:
                freqs = pickle.load(f)
                results.append((date_, freqs.get(self.word))

        # Write results list to output CSV file

【问题讨论】:

  • 您需要做什么正在进行的处理?例如,您是否计划在新一天的数据到达时重新运行日常流程?如果只需要运行一次,那么运行 luigi 可能没有意义。无论如何,您最好使用多处理。

标签: python-3.x parallel-processing luigi


【解决方案1】:

@MattMcKnight 说使用多处理可能会更好。但是,如果您想使用 Luigi,您可以这样做:

  • Luigi 具有您配置的工人的概念。这是并行运行不同任务的本地进程数。
  • 您可以对任务进行建模,而不是“循环”通过所有泡菜,将一个泡菜传递给任务(作为参数)。您必须将结果写入具有唯一名称的目录中的 TSV。
  • 有一个循环,为每个泡菜(日期)创建一个任务。配置工作人员的数量(即 5)。这样您就可以同时处理 5 个文件。
  • 您将需要一项额外的任务,将所有单独的 CSV 文件“合并”为一个。

希望这会有所帮助。

【讨论】:

    猜你喜欢
    • 2020-05-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2010-10-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多