【问题标题】:Apply parallel processing for "for" loop on defined functions that contain "for" loop as well in Python对包含“for”循环的已定义函数以及Python中的“for”循环应用并行处理
【发布时间】:2019-07-15 01:35:10
【问题描述】:

我正在寻找 Python 中复杂 for 循环的并行处理,但不知道如何将其应用于我的案例。假设我有一个文件input.txt,如下所示:

Group   Process Category        Type    Var1    Var2    Var3
A       3       cat1    type1   86.84   2.913   0.01096
A       3       cat1    type1   103.39  2.835   0.00564
A       3       cat1    type1   109.00  1.478   0.00365
A       3       cat1    type1   107.30  2.979   0.00631
A       3       cat1    type1   123.09  2.424   0.00531
A       3       cat1    type1   111.98  7.462   0.00332
A       841     cat2    type2   87.62   3.049   0.01195
A       841     cat2    type2   87.40   4.781   0.00930
A       841     cat2    type2   88.53   3.025   0.00697
A       841     cat2    type2   85.84   2.703   0.00697

理想情况下,我想使用四个定义的函数对GroupProcessCategoryType 进行分组,并对Var1Var2Var3 进行一些计算,其中三个也包含for 循环。实现中的output 如下:

   Group   Type  Process Category        Var1       Var2       Var3
0     A  type1        3     cat1  101.207332  13.997181  106.30899
1     A  type2      841     cat2   87.431341   3.584393  106.30899

完整的实现代码如下:

import pandas as pd
import numpy as np
from dplython import X, sift, DplyFrame, mutate, select
from plydata import define, group_by, summarize

def weightedMean(data):
        length = len(data['Var1'])
        if length == 1:
                mx = data['Var1']
                return(length)
        else:
                mx = data['Var1'][0]
                nx = data['Var3'][0]
                for i in range(1,length):
                        my = data['Var1'][i]
                        ny = data['Var3'][i]
                        nx = nx + ny
                        mx=(mx*nx+my*ny)/(nx+ny)
                return(mx)

def summation(data):
        length = len(data['Var3'])
        cx = data['Var3'][0]
        for i in range(1,length):
                cy = data['Var3'][i]
                cx = cx + cy
        return(cx)

def sd_c(x_m, x_s, x_n, y_m, y_s, y_n):
        al = x_n+y_n
        tmp_sd = al*((x_n-1)*(x_s*x_s)+(y_n-1)*(y_s*y_s))+y_n*x_n*(x_m-y_m)*(x_m-y_m)
        var = tmp_sd/(al*(al-1))
        std = np.sqrt(var)
        return(std)

def sd_pooled(data):
        length = len(data['Var1'])
        if length == 1:
                mx = data['Var1']
                return(length)
        else:
                mx = data['Var1'][0]
                sx = data['Var2'][0]
                nx = data['Var3'][0]
                for i in range(1,length):
                        my = data['Var1'][i]
                        sy = data['Var2'][i]
                        ny = data['Var3'][i]
                        sx = sd_c(mx, sx, nx, my, sy, ny)
                        nx = nx + ny
                        mx = (mx*nx + my*ny)/(nx + ny)
                return(sx)

dat = pd.read_csv("input.txt",sep="\t")

dat_name = dat.loc[:,'Type'].unique()
dat = DplyFrame(dat)

out = pd.DataFrame([])
for i in range(len(dat_name)):
        df = (dat >>
                sift(X.Type == dat_name[i]) >>
                mutate(Var3 = X.Var3*3021) >>
                sift(X.Var2 < 50))
        out = out.append(df)
        out_grouped = out.groupby(['Group', 'Type', 'Process', 'Category'])
        init = []
        mean = []
        stdv = []
        freq = []
        kmer = []
        for name, group in out_grouped:
                group = pd.DataFrame(group).reset_index()
                nm = name
                wm = weightedMean(group)
                sd = sd_pooled(group)
                fq = summation(group)
                init.append(nm)
                mean.append(wm)
                freq.append(fq)
                stdv.append(sd)
        init = pd.DataFrame(init)
        mean = pd.DataFrame(mean)
        freq = pd.DataFrame(freq)
        stdv = pd.DataFrame(stdv)
        init.rename(columns={0:'Group',1:'Type',2:'Process',3:'Category'}, inplace=True)
        mean.rename(columns={0:'Var1'}, inplace=True)
        stdv.rename(columns={0:'Var2'}, inplace=True)
        freq.rename(columns={0:'Var3'}, inplace=True)

output = pd.concat([init.reset_index(drop=True), mean, stdv, freq], axis=1)

在这种情况下,如何使用多核应用并行处理?提前致谢。

【问题讨论】:

  • 如何为每一行获取count?您的求和和加权平均函数引用了一个名为 count 的键,您能否提供一个高级视图,说明如何从 Var1Var2Var3 计算每一行的该值
  • @ThalishSajeed 我很抱歉。count 实际上是Var3。我已经更正了错字。
  • 因此,您希望将表格按 group 、 process 、 category 和 type 分组,然后在每一行上应用您的函数。我对么?分组时如何聚合 var 变量?我只是将它们相加吗?
  • 我已经设法groupby 并应用了这些功能。我从Type 获得了dat_name 的列表,并通过for 循环对数据进行子集化运行。我希望对此for 循环应用并行处理,但我找不到运行复杂for 循环的方法。我想知道在foreachdoParallel R 包中是否有任何可用的模块和功能等效于%dopar%?对于令人困惑的问题,我很抱歉。
  • 有更好的方法来实现并行处理。一旦我知道我是否正确理解了您的问题,我可以建议他们。如果我对问题的理解正确,你能回答吗?

标签: python multithreading for-loop parallel-processing


【解决方案1】:

这里有很多东西要解压,所以请多多包涵。

我已将数据集存储到名为 dat 的 pandas 数据框中

    Group   Process Category    Type    Var1    Var2    Var3
0       A      3    cat1       type1    86.84   2.913   0.01096
1       A      3    cat1       type1    103.39  2.835   0.00564
2       A      3    cat1       type1    109.00  1.478   0.00365
3       A      3    cat1       type1    107.30  2.979   0.00631
4       A      3    cat1       type1    123.09  2.424   0.00531
5       A      3    cat1       type1    111.98  7.462   0.00332
6       A      841  cat2       type2    87.62   3.049   0.01195
7       A      841  cat2       type2    87.40   4.781   0.00930
8       A      841  cat2       type2    88.53   3.025   0.00697
9       A      841  cat2       type2    85.84   2.703   0.00697

下一步是在ProcessCategoryType 上对表进行分组(我暂时省略了Group,因为Group 似乎只有1 个唯一值)

我使用了.sum(),因为这就是您所说的希望在分组期间聚合Var 变量的方式。

out_grouped = dat.groupby(['Type', 'Process', 'Category']).sum()

这是分组后数据框的样子 -

                                     Var1   Var2    Var3
Type    Process Category            
type1   3         cat1               641.60 20.091  0.03519
type2   841       cat2               349.39 13.558  0.03519

下一步是您要在此聚合的每一行上应用函数 -

必须设计您的函数,以便计算仅依赖于数据帧的每个原子行。

您的加权平均函数不应窥视其他行进行计算。如果您确实有要求 - 然后将这些值作为预处理步骤在数据框的每一行中可用。

def weightedMean(data):
    l = data['Var1']
    m = data['Var2']
    n = data['Var3']
    wm = l*m*n/l+m+n
    return wm

最后一步是对数据帧的每一行应用weightedMean 函数。

这是在单核上执行此操作的方法-

out_grouped["weighted_mean"] = out_grouped.apply(weightedMean, axis=1)

要跨多个内核分配此计算 - 您可以使用 DASK API

P.S 我很确定我在解释您的问题陈述的方式上犯了一些错误。所以请随时纠正我,我会适当地修改代码。

这是应用函数后数据框的外观 -

                              Var1      Var2    Var3    weighted_mean
Type    Process Category                
type1   3       cat1          641.60    20.091  0.03519 20.833192
type2   841     cat2          349.39    13.558  0.03519 14.070296

【讨论】:

  • 嗨,塔利什。感谢你的回答。然而,我真正想要的是将 parallel multiprocessing 应用于几个已定义的函数。我已经看到了将其应用于每个单独定义的函数的示例,但是我可以一次对多个函数运行并行多处理吗?
  • 应用多处理时。您可以定义要运行代码的核心数。要走的路是使用所有核心在整个数据帧上应用一个函数,然后存储结果。应用第二个函数,依此类推。我的意思是您可以将相同的范例应用于多个功能。只需有一个辅助函数来调用所有其他函数并在该辅助函数上使用 apply。
猜你喜欢
  • 2018-08-06
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-04-18
  • 2021-05-24
  • 2012-12-12
  • 2017-09-11
  • 2020-02-03
相关资源
最近更新 更多