【问题标题】:Parallelize apply after pandas groupby在 pandas groupby 之后并行化应用
【发布时间】:2014-11-29 00:46:19
【问题描述】:

我在groupby之后使用rosetta.parallel.pandas_easy并行化apply,例如:

from rosetta.parallel.pandas_easy import groupby_to_series_to_frame
df = pd.DataFrame({'a': [6, 2, 2], 'b': [4, 5, 6]},index= ['g1', 'g1', 'g2'])
groupby_to_series_to_frame(df, np.mean, n_jobs=8, use_apply=True, by=df.index)

但是,有没有人想出如何并行化返回 DataFrame 的函数?正如预期的那样,此代码对于 rosetta 失败。

def tmpFunc(df):
    df['c'] = df.a + df.b
    return df

df.groupby(df.index).apply(tmpFunc)
groupby_to_series_to_frame(df, tmpFunc, n_jobs=1, use_apply=True, by=df.index)

【问题讨论】:

    标签: python pandas parallel-processing


    【解决方案1】:

    编辑:为了在 pandas groupby 上获得更好的计算性能,您可以使用 numba 在运行时将您的代码编译成 C 代码并以 C 速度运行。如果你在groupby 之后应用的函数是纯numpy 计算,它会超级快(比这个并行化快很多)。

    您可以使用multiprocessingjoblib 来实现并行化。但是,如果组的数量很大并且每个组的DataFrame很大,则运行时间可能会更糟,因为您需要将这些组多次转移到CPU中。为了减少开销,我们可以先将数据分成大块,然后在这些块上并行计算。

    例如,假设您正在处理股票数据,您需要按代码对股票进行分组,然后计算一些统计数据。您可以先按代码的第一个字符(大块)进行分组,然后在这个虚拟组中执行操作:

    import pandas as pd
    from joblib import Parallel, delayed
    
    def group_func(dummy_group):
        # Do something to the group just like doing to the original dataframe.
        #     Example: calculate daily return.
        res = []
        for _, g in dummy_group.groupby('code'):
            g['daily_return']  = g.close / g.close.shift(1)
            res.append(g)
        return pd.concat(res)
    
    stock_data = stock_data.assign(dummy=stock_data['code'].str[0])
    
    Parallel(n_jobs=-1)(delayed(group_func)(group) for _, group in stock_data.groupby('dummy'))
    

    【讨论】:

      【解决方案2】:

      Ivan 的回答很棒,不过看起来可以稍微简化一下,也不需要依赖joblib:

      from multiprocessing import Pool, cpu_count
      
      def applyParallel(dfGrouped, func):
          with Pool(cpu_count()) as p:
              ret_list = p.map(func, [group for name, group in dfGrouped])
          return pandas.concat(ret_list)
      

      顺便说一句:这不能代替 any groupby.apply(),但它会涵盖典型情况:例如它应该涵盖案例 2 和案例 3 in the documentation,而您应该通过将参数 axis=1 提供给最终的 pandas.concat() 调用来获得案例 1 的行为。

      编辑: 文档已更改;旧版本可以在here找到,无论如何我复制粘贴下面的三个例子。

      case 1: group DataFrame apply aggregation function (f(chunk) -> Series) yield DataFrame, with group axis having group labels
      
      case 2: group DataFrame apply transform function ((f(chunk) -> DataFrame with same indexes) yield DataFrame with resulting chunks glued together
      
      case 3: group Series apply function with f(chunk) -> DataFrame yield DataFrame with result of chunks glued together
      

      【讨论】:

      • @Keiku 不知道,我以前从未听说过 REPL...但是您是否尝试过 func = lambda x : x"? If this doesn't work either, I suggest you open a specific question. You should be able to reproduce just with applyParallel([('one', 1), ('two', 2)] , your_func)``
      • 感谢您的建议。看来我尝试重新启动控制台并解决了它。很抱歉给您添麻烦了。
      • 文档似乎不再提供示例。有人可以详细说明一下吗?
      【解决方案3】:

      根据this thread,我个人建议使用 dask。

      正如@chrisb 所指出的,在 python 中使用 pandas 进行多处理可能会产生不必要的开销。它也可能像多线程甚至单线程一样执行。

      Dask 是专门为多进程创建的。

      【讨论】:

        【解决方案4】:

        伴随 JD Long 的回答的简短评论。我发现如果组的数量非常大(比如数十万),并且您的应用功能正在做一些相当简单和快速的事情,那么将您的数据帧分成块并将每个块分配给一个工作人员以执行groupby-apply(串行)比并行 groupby-apply 和让工作人员从包含多个组的队列中读取要快得多。示例:

        import pandas as pd
        import numpy as np
        import time
        from concurrent.futures import ProcessPoolExecutor, as_completed
        
        nrows = 15000
        np.random.seed(1980)
        df = pd.DataFrame({'a': np.random.permutation(np.arange(nrows))})
        

        所以我们的数据框看起来像:

            a
        0   3425
        1   1016
        2   8141
        3   9263
        4   8018
        

        请注意,“a”列有很多组(想想客户 ID):

        len(df.a.unique())
        15000
        

        对我们的组进行操作的函数:

        def f1(group):
            time.sleep(0.0001)
            return group
        

        启动一个池:

        ppe = ProcessPoolExecutor(12)
        futures = []
        results = []
        

        做一个并行的groupby-apply:

        %%time
        
        for name, group in df.groupby('a'):
            p = ppe.submit(f1, group)
            futures.append(p)
        
        for future in as_completed(futures):
            r = future.result()
            results.append(r)
        
        df_output = pd.concat(results)
        del ppe
        
        CPU times: user 18.8 s, sys: 2.15 s, total: 21 s
        Wall time: 17.9 s
        

        现在让我们添加一个列,将 df 划分为更少的组:

        df['b'] = np.random.randint(0, 12, nrows)
        

        现在只有 12 个,而不是 15000 个组:

        len(df.b.unique())
        12
        

        我们将对 df 进行分区并对每个块执行 groupby-apply。

        ppe = ProcessPoolExecutor(12)
        

        包装乐趣:

        def f2(df):
            df.groupby('a').apply(f1)
            return df
        

        依次发送每个要操作的块:

        %%time
        
        for i in df.b.unique():
            p = ppe.submit(f2, df[df.b==i])
            futures.append(p)
        
        for future in as_completed(futures):
            r = future.result()
            results.append(r)
        
        df_output = pd.concat(results) 
        
        CPU times: user 11.4 s, sys: 176 ms, total: 11.5 s
        Wall time: 12.4 s
        

        请注意,每组花费的时间量没有改变。相反,改变的是工作人员从中读取的队列的长度。我怀疑正在发生的事情是工作人员无法同时访问共享内存,并且不断返回以读取队列,从而相互踩踏。使用较大的块进行操作,工人返回的频率较低,因此这个问题得到了改善,整体执行速度更快。

        【讨论】:

        • 在我有 4 个物理内核的机器上,如果 f1 的延迟增加,我只能看到并行化的好处,否则串行有更好的时间。
        【解决方案5】:

        这似乎可行,虽然它确实应该内置在 pandas 中

        import pandas as pd
        from joblib import Parallel, delayed
        import multiprocessing
        
        def tmpFunc(df):
            df['c'] = df.a + df.b
            return df
        
        def applyParallel(dfGrouped, func):
            retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped)
            return pd.concat(retLst)
        
        if __name__ == '__main__':
            df = pd.DataFrame({'a': [6, 2, 2], 'b': [4, 5, 6]},index= ['g1', 'g1', 'g2'])
            print 'parallel version: '
            print applyParallel(df.groupby(df.index), tmpFunc)
        
            print 'regular version: '
            print df.groupby(df.index).apply(tmpFunc)
        
            print 'ideal version (does not work): '
            print df.groupby(df.index).applyParallel(tmpFunc)
        

        【讨论】:

        • 你知道在pandas中加入并行化是否有进展吗?
        • 通过对函数进行小的修改,可以返回常规应用返回的分层索引:def temp_func(func, name, group): return func(group), name def applyParallel(dfGrouped, func): retLst, top_index = zip(*Parallel(n_jobs=multiprocessing.cpu_count())(delayed(temp_func)(func, name, group) for name, group in dfGrouped)) return pd.concat(retLst, keys=top_index) Dang,我不知道如何在 cmets 中发布代码...
        • 您应该能够通过将applyParallel 绑定到df 来使“理想版本”工作:from types import MethodType; df.applyParallel = MethodType(applyParallel, df)
        • 我已经尝试过这种方法,但它并没有使用所有可用的 cpu,它只使用 1 或 2,即使我有 8 - 有人发生过这种情况吗?
        • 小心,这最终会比单核版本慢!如果您向每个作业发送大量数据但只有很短的计算,那么开销不值得,并且最终会变慢。
        【解决方案6】:

        我有一个用于在 Pandas 中实现并行化的 hack。我将我的数据帧分成块,将每个块放入列表的元素中,然后使用 ipython 的并行位对数据帧列表进行并行应用。然后我使用 pandas concat 函数将列表重新组合在一起。

        但是,这并不普遍适用。它对我有用,因为我想应用于数据帧的每个块的功能大约需要一分钟。并且将我的数据拆分和组合起来并不需要那么长时间。所以这显然是一个杂牌。话虽如此,这里有一个例子。我正在使用 Ipython 笔记本,所以你会在我的代码中看到 %%time 魔法:

        ## make some example data
        import pandas as pd
        
        np.random.seed(1)
        n=10000
        df = pd.DataFrame({'mygroup' : np.random.randint(1000, size=n), 
                           'data' : np.random.rand(n)})
        grouped = df.groupby('mygroup')
        

        对于本例,我将根据上述 groupby 制作“块”,但这不一定是数据的分块方式。虽然这是一个很常见的模式。

        dflist = []
        for name, group in grouped:
            dflist.append(group)
        

        设置并行位

        from IPython.parallel import Client
        rc = Client()
        lview = rc.load_balanced_view()
        lview.block = True
        

        编写一个愚蠢的函数来应用我们的数据

        def myFunc(inDf):
            inDf['newCol'] = inDf.data ** 10
            return inDf
        

        现在让我们先串行然后并行运行代码。 串行优先:

        %%time
        serial_list = map(myFunc, dflist)
        CPU times: user 14 s, sys: 19.9 ms, total: 14 s
        Wall time: 14 s
        

        现在并行

        %%time
        parallel_list = lview.map(myFunc, dflist)
        
        CPU times: user 1.46 s, sys: 86.9 ms, total: 1.54 s
        Wall time: 1.56 s
        

        那么只需几毫秒就可以将它们合并回一个数据帧

        %%time
        combinedDf = pd.concat(parallel_list)
         CPU times: user 296 ms, sys: 5.27 ms, total: 301 ms
        Wall time: 300 ms
        

        我在我的 MacBook 上运行了 6 个 IPython 引擎,但您可以看到它将执行时间从 14 秒降至 2 秒。

        对于真正长时间运行的随机模拟,我可以通过使用 StarCluster 启动集群来使用 AWS 后端。然而,很多时候,我只在我的 MBP 上跨 8 个 CPU 进行并行化。

        【讨论】:

        • 我会用我的代码试试这个,谢谢。你能解释一下为什么 apply 不会自动并行化操作吗?似乎拥有 apply 函数的全部好处是避免循环,但如果它不对这些组执行此操作,有什么好处?
        • 由于 GIL,Python 中的并行化很困难的说法由来已久。请记住,apply 通常是语法糖,并且在它下面执行隐含循环。使用并行化有点棘手,因为并行化存在运行时成本,这有时会抵消并行化的好处。
        • parallel_list 是否缺少定义,因为它在此行出现错误name 'parallel_list' is not definedcombinedDf = pd.concat(parallel_list)
        • 伊万,很明显!我认为他的答案非常好。 hackity hack 比我的少得多。
        猜你喜欢
        • 1970-01-01
        • 2015-02-13
        • 2021-02-10
        • 2020-11-19
        • 1970-01-01
        • 2017-12-19
        • 2018-10-31
        • 1970-01-01
        • 2017-01-10
        相关资源
        最近更新 更多