【发布时间】:2020-01-07 21:00:41
【问题描述】:
我有一个 Luigi 任务设置为根据传递给它的参数获取不同的数据,另一个任务是获取这些文件并将它们发送到我们的数据湖。我的问题是我不确定如何按顺序安排第一个任务的多个版本,以便以后可以将它们发送到数据湖。这是我的代码:
class ListStep(luigi.Task):
listId = luigi.IntParameter()
startDate = luigi.Parameter(None)
endDate = luigi.Parameter(None)
logPath = luigi.Parameter(None)
messagePath = luigi.Parameter(None)
summaryPath = luigi.Parameter(None)
contactPath = luigi.Parameter(None)
logFile = luigi.BoolParameter(True)
endpoint = luigi.Parameter()
subscribed = luigi.BoolParameter(True)
fileSuffix = luigi.Parameter(None)
def output(self):
today = datetime.datetime.now()
todayName = today.strftime("%m%d%y")
pipelineNameLog = "./pipelinelog/pipelinelog_{}_{}_{}.csv".format(client.listId, self.endpoint, todayName)
return luigi.LocalTarget(pipelineNameLog)
def run(self):
client = ListkWriter(client_id, client_secret, listId = self.listId, logPath = self.logPath, contactPath = self.contactPath, messagePath = self.messagePath, summaryPath = self.summaryPath)
if self.endpoint == "message":
filesList = client.getMessages(startDate = self.startDate, endDate = self.endDate, log = self.logFile, fileSuffix = self.fileSuffix)
elif self.endpoint == "contacts":
filesList = client.getContacts(startDate = self.startDate, endDate = self.endDate, log = self.logFile, fileSuffix = self.fileSuffix, subscribed = self.subscribed)
elif self.endpoint == "summary":
filesList = client.getSummary(startDate = self.startDate, endDate = self.endDate, log = self.logFile, fileSuffix = self.fileSuffix)
with self.output().open('w') as outfile:
for val in filesList:
outfile.write(",".join([val, datetime.datetime.now().strftime("%H:%M:%S")]))
outfile.write("\n")
class Transfer(luigi.Task):
step = luigi.TaskParameter()
uploadPath = luigi.Parameter()
def requires(self):
return self.step
def output(self):
today = datetime.datetime.now()
todayName = today.strftime("%m%d%y")
pipelineNameLog = "./pipelinelog/pipelinelog_{}_{}_{}.csv".format(client.listId, self.step.endpoint, todayName)
return luigi.LocalTarget(pipelineNameLog)
def run(self):
containerClient = ContainerClient.from_connection_string(storageCreds, 'datalake')
with self.input().open('r') as infile:
fileUpload = infile.readlines()
for file in fileUpload:
pathFile = file.split(",")[0]
fileName = ntpath.basename(pathFile)
uploadName = self.uploadPath + fileName
with open(pathFile, 'rb') as f:
containerClient.upload_blob(uploadName, f)
path = "/test/"
mMessage = ListStep(endpoint = "message", startDate = startDate, listId = listDict['m'], summaryPath = path, logPath = path, messagePath = path, contactPath = path)
gMessage = ListStep(endpoint = "message", startDate = startDate, listId = listDict['g'], summaryPath = path, logPath = path, messagePath = path, contactPath = path)
mSummary = ListStep(endpoint = "summary", startDate = startDate, listId = listDict['m'], summaryPath = path, logPath = path, messagePath = path, contactPath = path)
gSummary = ListStep(endpoint = "summary", startDate = startDate, listId = listDict['g'], summaryPath = path, logPath = path, messagePath = path, contactPath = path)
luigi.run([Transfer(step = mMessage, uploadPath = path), Transfer(step = gMessage, uploadPath = path),
Transfer(step = mSummary, uploadPath = path), Transfer(step = gSummary, uploadPath = path)],
local_scheduler = True)
我想要实现的是,一旦使用 ListStep 任务(使用指定的参数)获取数据,就可以使用 Transfer 任务将它们发送到数据湖。执行完第一组操作后,我希望它移动到下一组对象。我的目标不是让这一切并行运行,而是按顺序运行。当我执行这个脚本时,调度程序似乎只计算了 5 个任务而不是 8 个。此外,它继续执行 mMessage 任务,然后是 gMessage 任务而不执行 Transfer 任务。它稍后会因为 FileExistError 而崩溃并输出以下摘要:
===== Luigi Execution Summary =====
Scheduled 5 tasks of which:
* 2 ran successfully:
- 2 ListrakStep(...)
* 2 failed:
- 2 ListrakStep(...)
* 1 were left pending, among these:
* 1 was not granted run permission by the scheduler:
- 1 Transfer(step=ListrakStep, destination=datalake, uploadPath=test/)
This progress looks :( because there were failed tasks
===== Luigi Execution Summary =====
【问题讨论】:
标签: python-3.x luigi