【发布时间】:2018-10-04 18:33:19
【问题描述】:
这是我用来在 pandas.DataFrame 对象的行上并行化应用函数的代码:
from multiprocessing import cpu_count, Pool
from functools import partial
def parallel_applymap_df(df: DataFrame, func, num_cores=cpu_count(),**kargs):
partitions = np.linspace(0, len(df), num_cores + 1, dtype=np.int64)
df_split = [df.iloc[partitions[i]:partitions[i + 1]] for i in range(num_cores)]
pool = Pool(num_cores)
series = pd.concat(pool.map(partial(apply_wrapper, func=func, **kargs), df_split))
pool.close()
pool.join()
return series
它适用于 200 000 行的子样本,但是当我尝试完整的 200 000 000 个示例时,我收到以下错误消息:
~/anaconda3/lib/python3.6/site-packages/multiprocess/connection.py in _send_bytes(self, buf)
394 n = len(buf)
395 # For wire compatibility with 3.2 and lower
—> 396 header = struct.pack("!i", n)
397 if n > 16384:
398 # The payload is large so Nagle's algorithm won't be triggered
error: 'i' format requires -2147483648 <= number <= 2147483647
由行生成:
series = pd.concat(pool.map(partial(apply_wrapper, func=func, **kargs), df_split))
这很奇怪,因为我用来并行化未在 pandas 中矢量化的操作的一个稍微不同的版本(如 Series.dt.time)适用于相同数量的行。这是示例作品的版本:
def parallel_map_df(df: DataFrame, func, num_cores=cpu_count()):
partitions = np.linspace(0, len(df), num_cores + 1, dtype=np.int64)
df_split = [df.iloc[partitions[i]:partitions[i + 1]] for i in range(num_cores)]
pool = Pool(num_cores)
df = pd.concat(pool.map(func, df_split))
pool.close()
pool.join()
return df
【问题讨论】:
-
我遇到了一个非常相似的问题。您找到解决方案/解释了吗?
标签: python pandas multiprocess