您是否打算在集群内执行代码?如果没有,那么您最好以旧的单进程方式执行代码。我觉得 Raymond Hettinger 关于这个主题的精彩演讲非常有趣,我建议您查看:Raymond Hettinger, Keynote on Concurrency, PyBay 2017。
话虽如此,对您的实现的一个简单解决方法是将单个参数定义为make_dataframes 的输入,它表示df 和oneClass 的元组:
import pandas as pd
import numpy as np
from multiprocessing import Pool
def make_dataframes(args):
new_df = {}
df = args[0] # <--- Unpacking values
oneClass = args[-1] # <--- Unpacking values
df_train = df[df['CLASS'] == oneClass]
numeric_only_data = df_train.select_dtypes(include=np.number).loc[:, lambda xdf: xdf.columns.difference(['CLASS'])]
X = numeric_only_data.values
x = X * 100
orig_columns = numeric_only_data.loc[:, numeric_only_data.columns != 'CLASS'].columns
new_df[oneClass] = pd.DataFrame(x, columns=orig_columns)
new_df[oneClass]['CLASS'] = df_train['CLASS']
return new_df
df = pd.DataFrame({'a':[0,1,2], 'b':[3, 4, 5], 'c': [6, 7, 8], 'CLASS':['A', 'B', 'C']})
new_df = {}
classes = np.unique(df["CLASS"])
with Pool(4) as pool:
for new_dataframe in pool.map(make_dataframes, zip([df]*len(classes), classes)):
new_df[list(new_dataframe.keys())[0]] = list(new_dataframe.values())[0]
pool.close()
pool.join()
第二种方法是使用Joblib 包而不是multiprocessing,如下所示:
import pandas as pd
import numpy as np
from joblib import Parallel, delayed
def make_dataframes(df, oneClass):
new_df = {}
df_train = df[df["CLASS"] == oneClass]
numeric_only_data = df_train.select_dtypes(include=np.number).loc[
:, lambda xdf: xdf.columns.difference(["CLASS"])
]
X = numeric_only_data.values
x = X * 100
orig_columns = numeric_only_data.loc[
:, numeric_only_data.columns != "CLASS"
].columns
new_df[oneClass] = pd.DataFrame(x, columns=orig_columns)
new_df[oneClass]["CLASS"] = df_train["CLASS"]
return new_df
df = pd.DataFrame({'a':[0,1,2], 'b':[3, 4, 5], 'c': [6, 7, 8], 'CLASS':['A', 'B', 'C']})
classes = np.unique(df["CLASS"])
new_df = {
key: value
for parallel in Parallel(n_jobs=4)(
delayed(make_dataframes)(df, i) for i in classes
)
for key, value in parallel.items()
}
最后,我推荐使用的方法,如果您不打算在耗电的集群中运行此代码,并且需要提取所有可以从中获得的汁液:
import pandas as pd
import numpy as np
from joblib import Parallel, delayed
def make_dataframes(df, oneClass):
new_df = {}
df_train = df[df["CLASS"] == oneClass]
numeric_only_data = df_train.select_dtypes(include=np.number).loc[
:, lambda xdf: xdf.columns.difference(["CLASS"])
]
X = numeric_only_data.values
x = X * 100
orig_columns = numeric_only_data.loc[
:, numeric_only_data.columns != "CLASS"
].columns
new_df[oneClass] = pd.DataFrame(x, columns=orig_columns)
new_df[oneClass]["CLASS"] = df_train["CLASS"]
return new_df
df = pd.DataFrame({'a':[0,1,2], 'b':[3, 4, 5], 'c': [6, 7, 8], 'CLASS':['A', 'B', 'C']})
classes = np.unique(df["CLASS"])
new_df = {c: make_dataframes(df, c)[c] for c in classes}
为了比较,我记录了每种方法的执行时间:
-
multiprocessing:CPU 时间:用户 13.6 毫秒,系统:41.1 毫秒,总计:54.7 毫秒挂墙时间:158 毫秒
-
joblib:CPU 时间:用户 14.3 毫秒,系统:0 ns,总计:14.3 毫秒挂墙时间:16.5 毫秒
-
Serial processing:CPU 时间:用户 14.1 毫秒,系统:797 微秒,总计:14.9 毫秒挂墙时间:14.9 毫秒
并行运行会在不同处理节点之间产生大量开销通信成本。此外,它本质上是一项更复杂的任务,然后是串行运行。因此,开发和维护代码变得更加困难和昂贵。如果并行运行是第一要务,我建议首先放弃 Pandas,并改用PySpark 或Dask。