【问题标题】:luigi batch module used in is straight batch Tasksluigi 中使用的批处理模块是直接批处理任务
【发布时间】:2017-10-28 15:31:06
【问题描述】:

我有 500 个要下载的链接,并希望按例如 10 个项目对它们进行批处理。

这个伪代码会是什么样子?

class BatchJobTask(luigi.Task)
    items = luigi.Parameter()
    def run(self):
        listURLs = []
        with ('urls_chunk', 'r') as urls
            for line in urls:
                listURLs.append('http://ggg'+line+'.org')
            10_urls = listURLs[0:items] #10 items here
            for i in 10_urls:
                req = request.get(url)
                req.contents
    def output(self):
        return self.LocalTarger("downloaded_filelist.txt")

class BatchWorker(luigi.Task)
    def run(self)
        # Here I should run BatchJobTask from 0 to 10, next 11 - 21 new etc...

会怎么样?

【问题讨论】:

  • 您的网址列表在哪里?
  • 我已经更新了第一篇文章
  • 我的意思是这个 url 列表存储在哪里?在队列、数据库、文件中?你所要做的就是弄清楚那个东西有多少,然后从那里建立你的块。我将在下面举一个例子,但它不太可能与您的问题相关,因为您没有指定问题的相关部分。

标签: python parallel-processing luigi


【解决方案1】:

这是一种执行您想要的操作的方法,但将字符串列表存储为文件中的单独行。

import luigi
import requests

BATCH_SIZE = 10


class BatchProcessor(luigi.Task):
    items = luigi.ListParameter()
    max = luigi.IntParameter()

    def requires(self):
        return None

    def output(self):
        return luigi.LocalTarget('processed'+str(max)+'.txt')

    def run(self):
        for item in self.items:
            req = requests.get('http://www.'+item+'.org')
            # do something useful here
            req.contents
        open("processed"+str(max)+".txt",'w').close()


class BatchCreator(luigi.Task):
    file_with_urls = luigi.Parameter()

    def requires(self):
        required_tasks = []
        f = open(self.file_with_urls)
        batch_index = 0
        total_index = 0
        lines = []
        while True:
            line = f.readline()
            if not line: break
            total_index += 1
            if batch_index < BATCH_SIZE:
                lines.append(line)
                batch_index += 1
            else:
                required_tasks.append(BatchProcessor(batch=lines))
                lines = [line]
                batch_index = 1
        return required_tasks

    def output(self):
        return luigi.LocalTarget(str(self.file_with_urls) + 'processed')

    def run(self):
        open(str(self.file_with_urls) + 'processed', 'w').close()

【讨论】:

  • 不要在python中使用max作为变量。另外,需要参考self.max获取本地参数值;此代码不会运行。
【解决方案2】:

我这样做了。

class GetListtask(luigi.Task)
    def run(self):
        ...
    def output(self):
    return luigi.LocalTarget(self.outputfile)

class GetJustOneFile(luigi.Task):
    fid = luigi.IntParameter()
    def requires(self):
        pass

    def run(self):
        url = 'http://my-server.com/test' + str(self.fid) + '.txt'
        download_file = requests.get(url, stream=True)
        with self.output().open('w') as downloaded_file:
            downloaded_file.write(str(download_file.content))

    def output(self):
        return luigi.LocalTarget("test{}.txt".format(self.fid))


class GetAllFiles(luigi.WrapperTask):
    def requires(self):
        listoffiles = []  # 0..999
        for i in range(899):
            listoffiles.append(i)
        return [GetJustOneFile(fid=fileid) for fileid in listoffiles]

这段代码很糟糕吗?

【讨论】:

  • 嗯,它不做批处理,但它应该工作。
  • 如何在 GetAllFiles 中从 GetListTask 输入文件而不是预定义列表?
  • 这就是我在 BatchCreator 任务的 requires 方法中展示的内容,假设您有一个文件,其中文件的每一行都是不同的 URN 组件。
猜你喜欢
  • 2015-02-23
  • 2010-12-17
  • 1970-01-01
  • 2019-08-22
  • 2014-07-02
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多