【问题标题】:Problems producing desired luigi output产生所需 luigi 输出的问题
【发布时间】:2019-10-03 16:49:51
【问题描述】:

我正在尝试创建一个接收 3 个文件的管道,从每个文件中获取 n 行(由 obs_num 表示)将文件中的每个值与 0 到 1 之间的随机浮点数进行比较,然后返回 obs_num如果它大于随机数,否则为假。然后我将这些值附加到一个列表(列表 1)

然后我看下第二个文件,检查obs_num的位置,如果上一个文件中相同位置的num返回false返回false,否则再检查num是否大于随机float,我然后对第三个文件执行相同的操作。我还将这些值附加到列表(列表 2 和 3)

然后我将这 3 个列表转换为一个数据框,每个列表代表一列。

然而问题是,当我运行我的管道时,输出是一个包含一个空白列的文件,而不是一个行等于 obs_num 的 csv。

这是我用于包装器的代码:

import pandas as pd
import luigi
import state_to_state_machine as ssm
class wrapper(luigi.WrapperTask):
    def requires(self):
        file_tag = ['Sessiontolead','leadtoopportunity','opportunitytocomplete']
        size = 10
        for j in range(1,int(size)):
            return[ssm.state_machine(file_tag=i,size=size,obs_nums=j)for i in file_tag]
    def run(self):
        print('The wrapper is complete')
        pd.DataFrame().to_csv('/Users/emm/Documents/AttributionData/Data/datawranglerwrapper3.csv') #never returns anything
    def output(self):
        return luigi.LocalTarget('/Users/emm/Documents/AttributionData/Data/datawranglerwrapper3.csv')
if __name__ == '__main__':
    luigi.build([wrapper()],workers=8,local_scheduler=True)

状态机:

import pandas as pd
import get_samples as gs
import luigi
import random
class state_machine(luigi.Task):
    file_tag = luigi.Parameter()
    obs_nums = luigi.Parameter() #directly get element - don't write to file
    size = luigi.Parameter()
    def run(self):
        path = '/Users/emm/Documents/AttributionData/Data/Probabilities/'
        file = path+self.file_tag+'sampleprobs.csv'
        def generic_state_machine(tag,file=file,obs_nums=self.obs_nums):
            if file.split('/')[7][:4] == tag:
                state_machine = pd.read_csv(file)
                return state_machine.ix[:,1][obs_nums] if s.ix[:,1][obs_nums] > random.uniform(0,1) else False
        session_to_leads = []
        lead_to_opps = []
        opps_to_comp = []
        session_to_leads.append(generic_state_machine(tag='Sessiontoload+sampleprobabs',file=file,obs_nums=self.obs_nums))
        lead_to_opps.append(generic_state_machine(tag='leadtoopportunity+sampleprobabs',file=file,obs_nums=self.obs_nums)) if session_to_leads[self.obs_nums-1] != False else lead_to_opps.append(False)
        opps_to_comp.append(generic_state_machine(tag='opportunitytocomplete+sampleprobabs',file=file,obs_nums=self.obs_nums)) if lead_to_opps[self.obs_nums-1] != False else opps_to_comps.append(False)
        df = pd.DataFrame(zip(session_to_leads,lead_to_opps,opps_to_comp),columns=['session_to_leads','lead_to_opps','oops_to_comp'])
    with self.output().open('w') as out_csv:
        out_csv.write(df.to_csv('/Users/emmanuels/Documents/AttributionData/Data/Probabilities/'+str(self.file_tag)+str(self.size)+'statemachine.csv'))
def output(self):
    return luigi.LocalTarget('/Users/emmanuels/Documents/AttributionData/Data/Probabilities/'+str(self.file_tag)+str(self.size)+'statemachine.csv')

我问过这个问题的类似版本,但每次都改变了,我已经设法解决了大部分最初的问题——所以这不是对以前问题的重复

因此,根据我的理解,在这种情况下,这应该会生成 3 个状态机文件,每个文件都有 10 行,用于每次观察和进行的比较。

这三个文件实际上是具有 2 列的文件,第一个是索引,第二个是 0 到 1 之间的概率

我不确定这是否是我的代码逻辑或我如何使用 Luigi 的问题

【问题讨论】:

  • 如果您可以将其编辑为简化的错误示例,那就太好了。问题甚至可能不是路易吉。分解它并为自己制作小型测试用例。可能开始单元测试。
  • 另外,您似乎遇到了格式错误。 withoutput 与它们应该在的位置不一致。

标签: python pandas luigi


【解决方案1】:

检查您的格式。在您的状态机文件中,您的 with 语句出于某种原因处于类级别,而 output 方法处于命名空间级别。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-08-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-01-07
    • 2022-01-21
    相关资源
    最近更新 更多