【问题标题】:Chaining multiple tasks in Luigi在 Luigi 中链接多个任务
【发布时间】:2020-01-07 21:00:41
【问题描述】:

我有一个 Luigi 任务设置为根据传递给它的参数获取不同的数据,另一个任务是获取这些文件并将它们发送到我们的数据湖。我的问题是我不确定如何按顺序安排第一个任务的多个版本,以便以后可以将它们发送到数据湖。这是我的代码:

class ListStep(luigi.Task):

    listId = luigi.IntParameter()
    startDate = luigi.Parameter(None)
    endDate = luigi.Parameter(None)
    logPath = luigi.Parameter(None)
    messagePath = luigi.Parameter(None)
    summaryPath = luigi.Parameter(None)
    contactPath = luigi.Parameter(None)
    logFile = luigi.BoolParameter(True)
    endpoint = luigi.Parameter()
    subscribed = luigi.BoolParameter(True)
    fileSuffix = luigi.Parameter(None)

    def output(self):
        today =  datetime.datetime.now()
        todayName = today.strftime("%m%d%y")
        pipelineNameLog = "./pipelinelog/pipelinelog_{}_{}_{}.csv".format(client.listId, self.endpoint, todayName)
        return luigi.LocalTarget(pipelineNameLog)

    def run(self):
        client = ListkWriter(client_id, client_secret, listId = self.listId, logPath = self.logPath, contactPath = self.contactPath, messagePath = self.messagePath, summaryPath = self.summaryPath)

        if self.endpoint == "message":
            filesList = client.getMessages(startDate = self.startDate, endDate = self.endDate, log = self.logFile, fileSuffix = self.fileSuffix)
        elif self.endpoint == "contacts":
            filesList = client.getContacts(startDate = self.startDate, endDate = self.endDate,  log = self.logFile, fileSuffix = self.fileSuffix, subscribed = self.subscribed)
        elif self.endpoint == "summary":
            filesList = client.getSummary(startDate = self.startDate, endDate = self.endDate, log = self.logFile, fileSuffix = self.fileSuffix)

        with self.output().open('w') as outfile:
            for val in filesList:
                outfile.write(",".join([val, datetime.datetime.now().strftime("%H:%M:%S")]))
                outfile.write("\n")

class Transfer(luigi.Task):

    step = luigi.TaskParameter()
    uploadPath = luigi.Parameter()

    def requires(self):
        return self.step

    def output(self):
        today =  datetime.datetime.now()
        todayName = today.strftime("%m%d%y")
        pipelineNameLog = "./pipelinelog/pipelinelog_{}_{}_{}.csv".format(client.listId, self.step.endpoint, todayName)
        return luigi.LocalTarget(pipelineNameLog)

    def run(self):
        containerClient = ContainerClient.from_connection_string(storageCreds, 'datalake')
        with self.input().open('r') as infile:
            fileUpload = infile.readlines()

        for file in fileUpload:
            pathFile = file.split(",")[0]
            fileName = ntpath.basename(pathFile)
            uploadName = self.uploadPath + fileName
            with open(pathFile, 'rb') as f:
                containerClient.upload_blob(uploadName, f)

path = "/test/"

mMessage = ListStep(endpoint = "message", startDate = startDate, listId = listDict['m'], summaryPath = path, logPath = path, messagePath = path, contactPath = path)
gMessage = ListStep(endpoint = "message", startDate = startDate, listId = listDict['g'], summaryPath = path, logPath = path, messagePath = path, contactPath = path)
mSummary = ListStep(endpoint = "summary", startDate = startDate, listId = listDict['m'], summaryPath = path, logPath = path, messagePath = path, contactPath = path)
gSummary = ListStep(endpoint = "summary", startDate = startDate, listId = listDict['g'], summaryPath = path, logPath = path, messagePath = path, contactPath = path)

luigi.run([Transfer(step = mMessage, uploadPath = path), Transfer(step = gMessage, uploadPath = path),
           Transfer(step = mSummary, uploadPath = path), Transfer(step = gSummary, uploadPath = path)],
          local_scheduler = True)

我想要实现的是,一旦使用 ListStep 任务(使用指定的参数)获取数据,就可以使用 Transfer 任务将它们发送到数据湖。执行完第一组操作后,我希望它移动到下一组对象。我的目标不是让这一切并行运行,而是按顺序运行。当我执行这个脚本时,调度程序似乎只计算了 5 个任务而不是 8 个。此外,它继续执行 mMessage 任务,然后是 gMessage 任务而不执行 Transfer 任务。它稍后会因为 FileExistError 而崩溃并输出以下摘要:

===== Luigi Execution Summary =====

Scheduled 5 tasks of which:
* 2 ran successfully:
    - 2 ListrakStep(...)
* 2 failed:
    - 2 ListrakStep(...)
* 1 were left pending, among these:
    * 1 was not granted run permission by the scheduler:
        - 1 Transfer(step=ListrakStep, destination=datalake, uploadPath=test/)

This progress looks :( because there were failed tasks

===== Luigi Execution Summary =====

【问题讨论】:

    标签: python-3.x luigi


    【解决方案1】:

    首先,除非您有任务需要其他任务,否则您不能保证任务将按特定顺序运行。您可能已经阅读过文档中的优先级系统,但这仅决定分配给工作人员的内容,而不是工作人员完成任务的时间。对任务进行排序的一种方法是使用任务参数并像这样构建一个序列:

    class SequenceTask(luigi.Task):
        prev_task = luigi.TaskParameter()
    
        def requires(self):
            return self.prev_task
        ...
    
    a = SequenceTask(...)
    b = SequenceTask(prev_task=a, ...)
    c = SequenceTask(prev_task=b, ...)
    

    您实际上在您的TransferTask 中执行此操作(尽管我认为您应该改为需要由TransferTask 参数化的ListStep,但如果它有效,它就有效),所以它应该很容易实现。

    我不相信调度程序只发现了 5 个任务。看起来它计划 5,但随后没有授予第一个 Transfer 任务运行的权限。

    注意:您对ListStep 的输出不正确,在将来给您带来问题。您需要在 output 函数之外计算日期时间,否则当第二次调用 output 时,日期将不同,因此任务永远不会完成。

    【讨论】:

      猜你喜欢
      • 2020-05-10
      • 1970-01-01
      • 2018-03-09
      • 1970-01-01
      • 1970-01-01
      • 2016-03-11
      • 1970-01-01
      • 2021-11-27
      • 2020-01-02
      相关资源
      最近更新 更多