伴随 JD Long 的回答的简短评论。我发现如果组的数量非常大(比如数十万),并且您的应用功能正在做一些相当简单和快速的事情,那么将您的数据帧分成块并将每个块分配给一个工作人员以执行groupby-apply(串行)比并行 groupby-apply 和让工作人员从包含多个组的队列中读取要快得多。示例:
import pandas as pd
import numpy as np
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
nrows = 15000
np.random.seed(1980)
df = pd.DataFrame({'a': np.random.permutation(np.arange(nrows))})
所以我们的数据框看起来像:
a
0 3425
1 1016
2 8141
3 9263
4 8018
请注意,“a”列有很多组(想想客户 ID):
len(df.a.unique())
15000
对我们的组进行操作的函数:
def f1(group):
time.sleep(0.0001)
return group
启动一个池:
ppe = ProcessPoolExecutor(12)
futures = []
results = []
做一个并行的groupby-apply:
%%time
for name, group in df.groupby('a'):
p = ppe.submit(f1, group)
futures.append(p)
for future in as_completed(futures):
r = future.result()
results.append(r)
df_output = pd.concat(results)
del ppe
CPU times: user 18.8 s, sys: 2.15 s, total: 21 s
Wall time: 17.9 s
现在让我们添加一个列,将 df 划分为更少的组:
df['b'] = np.random.randint(0, 12, nrows)
现在只有 12 个,而不是 15000 个组:
len(df.b.unique())
12
我们将对 df 进行分区并对每个块执行 groupby-apply。
ppe = ProcessPoolExecutor(12)
包装乐趣:
def f2(df):
df.groupby('a').apply(f1)
return df
依次发送每个要操作的块:
%%time
for i in df.b.unique():
p = ppe.submit(f2, df[df.b==i])
futures.append(p)
for future in as_completed(futures):
r = future.result()
results.append(r)
df_output = pd.concat(results)
CPU times: user 11.4 s, sys: 176 ms, total: 11.5 s
Wall time: 12.4 s
请注意,每组花费的时间量没有改变。相反,改变的是工作人员从中读取的队列的长度。我怀疑正在发生的事情是工作人员无法同时访问共享内存,并且不断返回以读取队列,从而相互踩踏。使用较大的块进行操作,工人返回的频率较低,因此这个问题得到了改善,整体执行速度更快。