【发布时间】:2017-05-05 12:52:30
【问题描述】:
我必须在一行一行的基础上处理一个巨大的pandas.DataFrame(几十 GB),其中每行操作都相当长(几十毫秒)。所以我有了将帧分割成块并使用multiprocessing 并行处理每个块的想法。这确实加速了任务,但内存消耗是一场噩梦。
虽然每个子进程原则上应该只消耗一小部分数据,但它需要(几乎)与包含原始DataFrame 的原始父进程一样多的内存。即使在父进程中删除使用的部分也无济于事。
我写了一个最小的例子来复制这种行为。它唯一做的就是用随机数创建一个大的DataFrame,将它分成最多100行的小块,并在多处理期间简单地打印一些关于DataFrame的信息(这里通过大小为4的mp.Pool) .
并行执行的main函数:
def just_wait_and_print_len_and_idx(df):
"""Waits for 5 seconds and prints df length and first and last index"""
# Extract some info
idx_values = df.index.values
first_idx, last_idx = idx_values[0], idx_values[-1]
length = len(df)
pid = os.getpid()
# Waste some CPU cycles
time.sleep(1)
# Print the info
print('First idx {}, last idx {} and len {} '
'from process {}'.format(first_idx, last_idx, length, pid))
将DataFrame 分块的辅助生成器:
def df_chunking(df, chunksize):
"""Splits df into chunks, drops data of original df inplace"""
count = 0 # Counter for chunks
while len(df):
count += 1
print('Preparing chunk {}'.format(count))
# Return df chunk
yield df.iloc[:chunksize].copy()
# Delete data in place because it is no longer needed
df.drop(df.index[:chunksize], inplace=True)
还有主程序:
def main():
# Job parameters
n_jobs = 4 # Poolsize
size = (10000, 1000) # Size of DataFrame
chunksize = 100 # Maximum size of Frame Chunk
# Preparation
df = pd.DataFrame(np.random.rand(*size))
pool = mp.Pool(n_jobs)
print('Starting MP')
# Execute the wait and print function in parallel
pool.imap(just_wait_and_print_len_and_idx, df_chunking(df, chunksize))
pool.close()
pool.join()
print('DONE')
标准输出如下所示:
Starting MP
Preparing chunk 1
Preparing chunk 2
First idx 0, last idx 99 and len 100 from process 9913
First idx 100, last idx 199 and len 100 from process 9914
Preparing chunk 3
First idx 200, last idx 299 and len 100 from process 9915
Preparing chunk 4
...
DONE
问题:
主进程需要大约 120MB 内存。但是,池的子进程需要相同数量的内存,尽管它们只包含原始 DataFame 的 1%(大小为 100 的块与原始长度为 10000)。为什么?
我能做些什么呢?尽管我进行了分块,Python (3) 是否会将整个 DataFrame 发送到每个子进程?这是pandas 内存管理的问题还是multiprocessing 和数据酸洗的问题?谢谢!
如果您想自己尝试一下,可以简单地复制和粘贴整个脚本:
import multiprocessing as mp
import pandas as pd
import numpy as np
import time
import os
def just_wait_and_print_len_and_idx(df):
"""Waits for 5 seconds and prints df length and first and last index"""
# Extract some info
idx_values = df.index.values
first_idx, last_idx = idx_values[0], idx_values[-1]
length = len(df)
pid = os.getpid()
# Waste some CPU cycles
time.sleep(1)
# Print the info
print('First idx {}, last idx {} and len {} '
'from process {}'.format(first_idx, last_idx, length, pid))
def df_chunking(df, chunksize):
"""Splits df into chunks, drops data of original df inplace"""
count = 0 # Counter for chunks
while len(df):
count += 1
print('Preparing chunk {}'.format(count))
# Return df chunk
yield df.iloc[:chunksize].copy()
# Delete data in place because it is no longer needed
df.drop(df.index[:chunksize], inplace=True)
def main():
# Job parameters
n_jobs = 4 # Poolsize
size = (10000, 1000) # Size of DataFrame
chunksize = 100 # Maximum size of Frame Chunk
# Preparation
df = pd.DataFrame(np.random.rand(*size))
pool = mp.Pool(n_jobs)
print('Starting MP')
# Execute the wait and print function in parallel
pool.imap(just_wait_and_print_len_and_idx, df_chunking(df, chunksize))
pool.close()
pool.join()
print('DONE')
if __name__ == '__main__':
main()
【问题讨论】:
-
有点旧 - 但仍然有效:stackoverflow.com/questions/10369219/… 基本上 - 你看到的 :) 可能不是“真的”;
-
好的,谢谢,这可能解释了它^^
-
我必须收回这一点,如果我使用所有 8 个内核(在我的实际问题中,几十 GB,父进程需要大约 22% 的 RAM,子进程也是如此)在某些情况下点所有的子进程吞下所有的内存,整个事情都爆炸了。如果我只使用 4 个核心,它需要两倍的时间,但会成功并且不会崩溃。所以虚拟内存确实会转化为物理内存:-(
-
@SmCaterpillar 我一直在密切关注您的示例。甚至整个变薄了主要DF的使用部分。但在我的情况下,消除使用的行一次只会将 DF 减少 40 行。此外,我无法像您那样利用块,因为必须以自定义方式对 DF 进行分块。很高兴得到您的想法:stackoverflow.com/questions/62545562/…
标签: python pandas memory multiprocessing python-3.5