【问题标题】:select a part of dataframe every time in parallel每次并行选择一部分数据帧
【发布时间】:2022-09-27 16:41:53
【问题描述】:

我想循环创建字典。

因为,在每次迭代中我只取初始数据帧 ( df_train = df[df[\'CLASS\'] == oneClass]) 的一部分,我想让它并行。

我的代码是:

import pandas as pd
import numpy as np
from multiprocessing import Pool

df = pd.DataFrame({\'a\':[0,1,2], \'b\':[3, 4, 5], \'c\': [6, 7, 8], \'CLASS\':[\'A\', \'B\', \'C\']})


def make_dataframes(df, oneClass):
    new_df = {}
    df_train = df[df[\'CLASS\'] == oneClass]
    numeric_only_data_cols = df_train.select_dtypes(include=np.number).columns.difference([\'CLASS\'])
    numeric_only_data = df_train[numeric_only_data_cols]

    X = numeric_only_data.values
    x = X * 100
    
    orig_columns = numeric_only_data.loc[:, 
                                         numeric_only_data.columns!=\'CLASS\'].columns
   
    new_df[oneClass] = pd.DataFrame(x, columns=orig_columns)
    new_df[oneClass][\'CLASS\'] = df_train[\'CLASS\']
    
    return new_df
        

new_df = {}
classes = np.unique(df[\'CLASS\'])
with Pool(4) as pool:
    for new_dataframe in pool.map(make_dataframes, classes):
        new_df[\'new_dataframe\'] = new_dataframe
        pool.close()
        pool.join()
        

我省略了函数中的 for 循环:

new_df = {}
for oneClass in classes:
    df_train = df[df[\'GROUP_DESC\'] == oneClass]
    ...

现在,我收到:

make_dataframes() missing 1 required positional argument: \'oneClass\'

我不确定如何放置函数的参数以及classes 是否是map 的有效参数。

    标签: python-3.x pandas multithreading multiprocessing


    【解决方案1】:

    您是否打算在集群内执行代码?如果没有,那么您最好以旧的单进程方式执行代码。我觉得 Raymond Hettinger 关于这个主题的精彩演讲非常有趣,我建议您查看:Raymond Hettinger, Keynote on Concurrency, PyBay 2017

    话虽如此,对您的实现的一个简单解决方法是将单个参数定义为make_dataframes 的输入,它表示dfoneClass 的元组:

    import pandas as pd
    import numpy as np
    from multiprocessing import Pool
    
    
    def make_dataframes(args):
    
        new_df = {}
    
        df = args[0]        # <--- Unpacking values
        oneClass = args[-1] # <--- Unpacking values
    
        df_train = df[df['CLASS'] == oneClass]
        numeric_only_data = df_train.select_dtypes(include=np.number).loc[:, lambda xdf: xdf.columns.difference(['CLASS'])]
    
        X = numeric_only_data.values
        x = X * 100
        
        orig_columns = numeric_only_data.loc[:, numeric_only_data.columns != 'CLASS'].columns
        new_df[oneClass] = pd.DataFrame(x, columns=orig_columns)
        new_df[oneClass]['CLASS'] = df_train['CLASS']
        return new_df
    
    
    df = pd.DataFrame({'a':[0,1,2], 'b':[3, 4, 5], 'c': [6, 7, 8], 'CLASS':['A', 'B', 'C']})
    new_df = {}
    classes = np.unique(df["CLASS"])
    with Pool(4) as pool:
        for new_dataframe in pool.map(make_dataframes, zip([df]*len(classes), classes)):
            new_df[list(new_dataframe.keys())[0]] = list(new_dataframe.values())[0]
            pool.close()
            pool.join()
    
    

    第二种方法是使用Joblib 包而不是multiprocessing,如下所示:

    
    import pandas as pd
    import numpy as np
    from joblib import Parallel, delayed
    
    
    def make_dataframes(df, oneClass):
        new_df = {}
        df_train = df[df["CLASS"] == oneClass]
        numeric_only_data = df_train.select_dtypes(include=np.number).loc[
            :, lambda xdf: xdf.columns.difference(["CLASS"])
        ]
    
        X = numeric_only_data.values
        x = X * 100
    
        orig_columns = numeric_only_data.loc[
            :, numeric_only_data.columns != "CLASS"
        ].columns
        new_df[oneClass] = pd.DataFrame(x, columns=orig_columns)
        new_df[oneClass]["CLASS"] = df_train["CLASS"]
        return new_df
    
    
    df = pd.DataFrame({'a':[0,1,2], 'b':[3, 4, 5], 'c': [6, 7, 8], 'CLASS':['A', 'B', 'C']})
    
    classes = np.unique(df["CLASS"])
    
    new_df = {
        key: value
        for parallel in Parallel(n_jobs=4)(
            delayed(make_dataframes)(df, i) for i in classes
        )
        for key, value in parallel.items()
    }
    
    

    最后,我推荐使用的方法,如果您不打算在耗电的集群中运行此代码,并且需要提取所有可以从中获得的汁液:

    
    import pandas as pd
    import numpy as np
    from joblib import Parallel, delayed
    
    
    def make_dataframes(df, oneClass):
        new_df = {}
        df_train = df[df["CLASS"] == oneClass]
        numeric_only_data = df_train.select_dtypes(include=np.number).loc[
            :, lambda xdf: xdf.columns.difference(["CLASS"])
        ]
    
        X = numeric_only_data.values
        x = X * 100
    
        orig_columns = numeric_only_data.loc[
            :, numeric_only_data.columns != "CLASS"
        ].columns
        new_df[oneClass] = pd.DataFrame(x, columns=orig_columns)
        new_df[oneClass]["CLASS"] = df_train["CLASS"]
        return new_df
    
    
    df = pd.DataFrame({'a':[0,1,2], 'b':[3, 4, 5], 'c': [6, 7, 8], 'CLASS':['A', 'B', 'C']})
    classes = np.unique(df["CLASS"])
    new_df = {c: make_dataframes(df, c)[c] for c in classes}
    
    

    为了比较,我记录了每种方法的执行时间:

    • multiprocessing:CPU 时间:用户 13.6 毫秒,系统:41.1 毫秒,总计:54.7 毫秒挂墙时间:158 毫秒
    • joblib:CPU 时间:用户 14.3 毫秒,系统:0 ns,总计:14.3 毫秒挂墙时间:16.5 毫秒
    • Serial processing:CPU 时间:用户 14.1 毫秒,系统:797 微秒,总计:14.9 毫秒挂墙时间:14.9 毫秒

    并行运行会在不同处理节点之间产生大量开销通信成本。此外,它本质上是一项更复杂的任务,然后是串行运行。因此,开发和维护代码变得更加困难和昂贵。如果并行运行是第一要务,我建议首先放弃 Pandas,并改用PySparkDask

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多