【发布时间】:2019-07-15 01:35:10
【问题描述】:
我正在寻找 Python 中复杂 for 循环的并行处理,但不知道如何将其应用于我的案例。假设我有一个文件input.txt,如下所示:
Group Process Category Type Var1 Var2 Var3
A 3 cat1 type1 86.84 2.913 0.01096
A 3 cat1 type1 103.39 2.835 0.00564
A 3 cat1 type1 109.00 1.478 0.00365
A 3 cat1 type1 107.30 2.979 0.00631
A 3 cat1 type1 123.09 2.424 0.00531
A 3 cat1 type1 111.98 7.462 0.00332
A 841 cat2 type2 87.62 3.049 0.01195
A 841 cat2 type2 87.40 4.781 0.00930
A 841 cat2 type2 88.53 3.025 0.00697
A 841 cat2 type2 85.84 2.703 0.00697
理想情况下,我想使用四个定义的函数对Group、Process、Category 和Type 进行分组,并对Var1、Var2 和Var3 进行一些计算,其中三个也包含for 循环。实现中的output 如下:
Group Type Process Category Var1 Var2 Var3
0 A type1 3 cat1 101.207332 13.997181 106.30899
1 A type2 841 cat2 87.431341 3.584393 106.30899
完整的实现代码如下:
import pandas as pd
import numpy as np
from dplython import X, sift, DplyFrame, mutate, select
from plydata import define, group_by, summarize
def weightedMean(data):
length = len(data['Var1'])
if length == 1:
mx = data['Var1']
return(length)
else:
mx = data['Var1'][0]
nx = data['Var3'][0]
for i in range(1,length):
my = data['Var1'][i]
ny = data['Var3'][i]
nx = nx + ny
mx=(mx*nx+my*ny)/(nx+ny)
return(mx)
def summation(data):
length = len(data['Var3'])
cx = data['Var3'][0]
for i in range(1,length):
cy = data['Var3'][i]
cx = cx + cy
return(cx)
def sd_c(x_m, x_s, x_n, y_m, y_s, y_n):
al = x_n+y_n
tmp_sd = al*((x_n-1)*(x_s*x_s)+(y_n-1)*(y_s*y_s))+y_n*x_n*(x_m-y_m)*(x_m-y_m)
var = tmp_sd/(al*(al-1))
std = np.sqrt(var)
return(std)
def sd_pooled(data):
length = len(data['Var1'])
if length == 1:
mx = data['Var1']
return(length)
else:
mx = data['Var1'][0]
sx = data['Var2'][0]
nx = data['Var3'][0]
for i in range(1,length):
my = data['Var1'][i]
sy = data['Var2'][i]
ny = data['Var3'][i]
sx = sd_c(mx, sx, nx, my, sy, ny)
nx = nx + ny
mx = (mx*nx + my*ny)/(nx + ny)
return(sx)
dat = pd.read_csv("input.txt",sep="\t")
dat_name = dat.loc[:,'Type'].unique()
dat = DplyFrame(dat)
out = pd.DataFrame([])
for i in range(len(dat_name)):
df = (dat >>
sift(X.Type == dat_name[i]) >>
mutate(Var3 = X.Var3*3021) >>
sift(X.Var2 < 50))
out = out.append(df)
out_grouped = out.groupby(['Group', 'Type', 'Process', 'Category'])
init = []
mean = []
stdv = []
freq = []
kmer = []
for name, group in out_grouped:
group = pd.DataFrame(group).reset_index()
nm = name
wm = weightedMean(group)
sd = sd_pooled(group)
fq = summation(group)
init.append(nm)
mean.append(wm)
freq.append(fq)
stdv.append(sd)
init = pd.DataFrame(init)
mean = pd.DataFrame(mean)
freq = pd.DataFrame(freq)
stdv = pd.DataFrame(stdv)
init.rename(columns={0:'Group',1:'Type',2:'Process',3:'Category'}, inplace=True)
mean.rename(columns={0:'Var1'}, inplace=True)
stdv.rename(columns={0:'Var2'}, inplace=True)
freq.rename(columns={0:'Var3'}, inplace=True)
output = pd.concat([init.reset_index(drop=True), mean, stdv, freq], axis=1)
在这种情况下,如何使用多核应用并行处理?提前致谢。
【问题讨论】:
-
如何为每一行获取
count?您的求和和加权平均函数引用了一个名为count的键,您能否提供一个高级视图,说明如何从Var1、Var2和Var3计算每一行的该值 -
@ThalishSajeed 我很抱歉。
count实际上是Var3。我已经更正了错字。 -
因此,您希望将表格按 group 、 process 、 category 和 type 分组,然后在每一行上应用您的函数。我对么?分组时如何聚合 var 变量?我只是将它们相加吗?
-
我已经设法
groupby并应用了这些功能。我从Type获得了dat_name的列表,并通过for循环对数据进行子集化运行。我希望对此for循环应用并行处理,但我找不到运行复杂for循环的方法。我想知道在foreach和doParallelR 包中是否有任何可用的模块和功能等效于%dopar%?对于令人困惑的问题,我很抱歉。 -
有更好的方法来实现并行处理。一旦我知道我是否正确理解了您的问题,我可以建议他们。如果我对问题的理解正确,你能回答吗?
标签: python multithreading for-loop parallel-processing