【发布时间】:2019-10-03 16:49:51
【问题描述】:
我正在尝试创建一个接收 3 个文件的管道,从每个文件中获取 n 行(由 obs_num 表示)将文件中的每个值与 0 到 1 之间的随机浮点数进行比较,然后返回 obs_num如果它大于随机数,否则为假。然后我将这些值附加到一个列表(列表 1)
然后我看下第二个文件,检查obs_num的位置,如果上一个文件中相同位置的num返回false返回false,否则再检查num是否大于随机float,我然后对第三个文件执行相同的操作。我还将这些值附加到列表(列表 2 和 3)
然后我将这 3 个列表转换为一个数据框,每个列表代表一列。
然而问题是,当我运行我的管道时,输出是一个包含一个空白列的文件,而不是一个行等于 obs_num 的 csv。
这是我用于包装器的代码:
import pandas as pd
import luigi
import state_to_state_machine as ssm
class wrapper(luigi.WrapperTask):
def requires(self):
file_tag = ['Sessiontolead','leadtoopportunity','opportunitytocomplete']
size = 10
for j in range(1,int(size)):
return[ssm.state_machine(file_tag=i,size=size,obs_nums=j)for i in file_tag]
def run(self):
print('The wrapper is complete')
pd.DataFrame().to_csv('/Users/emm/Documents/AttributionData/Data/datawranglerwrapper3.csv') #never returns anything
def output(self):
return luigi.LocalTarget('/Users/emm/Documents/AttributionData/Data/datawranglerwrapper3.csv')
if __name__ == '__main__':
luigi.build([wrapper()],workers=8,local_scheduler=True)
状态机:
import pandas as pd
import get_samples as gs
import luigi
import random
class state_machine(luigi.Task):
file_tag = luigi.Parameter()
obs_nums = luigi.Parameter() #directly get element - don't write to file
size = luigi.Parameter()
def run(self):
path = '/Users/emm/Documents/AttributionData/Data/Probabilities/'
file = path+self.file_tag+'sampleprobs.csv'
def generic_state_machine(tag,file=file,obs_nums=self.obs_nums):
if file.split('/')[7][:4] == tag:
state_machine = pd.read_csv(file)
return state_machine.ix[:,1][obs_nums] if s.ix[:,1][obs_nums] > random.uniform(0,1) else False
session_to_leads = []
lead_to_opps = []
opps_to_comp = []
session_to_leads.append(generic_state_machine(tag='Sessiontoload+sampleprobabs',file=file,obs_nums=self.obs_nums))
lead_to_opps.append(generic_state_machine(tag='leadtoopportunity+sampleprobabs',file=file,obs_nums=self.obs_nums)) if session_to_leads[self.obs_nums-1] != False else lead_to_opps.append(False)
opps_to_comp.append(generic_state_machine(tag='opportunitytocomplete+sampleprobabs',file=file,obs_nums=self.obs_nums)) if lead_to_opps[self.obs_nums-1] != False else opps_to_comps.append(False)
df = pd.DataFrame(zip(session_to_leads,lead_to_opps,opps_to_comp),columns=['session_to_leads','lead_to_opps','oops_to_comp'])
with self.output().open('w') as out_csv:
out_csv.write(df.to_csv('/Users/emmanuels/Documents/AttributionData/Data/Probabilities/'+str(self.file_tag)+str(self.size)+'statemachine.csv'))
def output(self):
return luigi.LocalTarget('/Users/emmanuels/Documents/AttributionData/Data/Probabilities/'+str(self.file_tag)+str(self.size)+'statemachine.csv')
我问过这个问题的类似版本,但每次都改变了,我已经设法解决了大部分最初的问题——所以这不是对以前问题的重复
因此,根据我的理解,在这种情况下,这应该会生成 3 个状态机文件,每个文件都有 10 行,用于每次观察和进行的比较。
这三个文件实际上是具有 2 列的文件,第一个是索引,第二个是 0 到 1 之间的概率
我不确定这是否是我的代码逻辑或我如何使用 Luigi 的问题
【问题讨论】:
-
如果您可以将其编辑为简化的错误示例,那就太好了。问题甚至可能不是路易吉。分解它并为自己制作小型测试用例。可能开始单元测试。
-
另外,您似乎遇到了格式错误。
with和output与它们应该在的位置不一致。