【发布时间】:2015-06-30 20:20:17
【问题描述】:
我正在使用multiprocessing python 模块。我有大约 20-25 个任务要同时运行。每个任务将创建一个约 20k 行的 pandas.DataFrame 对象。问题是,所有任务都执行得很好,但是当涉及到“加入”进程时,它就停止了。我已经尝试过使用“小型”DataFrame,它运行良好。为了说明我的观点,我创建了下面的代码。
import pandas
import multiprocessing as mp
def task(arg, queue):
DF = pandas.DataFrame({"hello":range(10)}) # try range(1000) or range(10000)
queue.put(DF)
print("DF %d stored" %arg)
listArgs = range(20)
queue = mp.Queue()
processes = [mp.Process(target=task,args=(arg,queue)) for arg in listArgs]
for p in processes:
p.start()
for i,p in enumerate(processes):
print("joining %d" %i)
p.join()
results = [queue.get() for p in processes]
编辑:
使用 DF = pandas.DataFrame({"hello":range(10)}) 我一切都正确:“存储 DF 0”到“存储 DF 19”,与“加入”相同0”到“加入19”。
但是 DF = pandas.DataFrame({"hello":range(1000)}) 会出现问题:在存储 DF 时,加入步骤在“加入 3”之后停止。
感谢有用的提示:)
【问题讨论】:
-
那么输出是什么样子的呢?它是否说“存储 DF 0”到“存储 DF 19”(以任意顺序),并在某处混合了“加入 0”?还是有什么不同? (如果问题也仅发生在 3 个进程中,请考虑这样做,这样您就可以粘贴整个输出而不会压倒任何人……)
-
您在运行代码时查看过系统资源利用率吗?如果是内存问题,请查看stackoverflow.com/questions/8956832/…
-
另外,我不知道 Pandas 是否可以使用 NumPy 的多处理共享数组(google numpy-sharedmem),但如果可以,那可能比 pickling 20 更有效巨大的帧并将它们通过管道传递(这是
Queue在幕后所做的——熊猫的pickler 中可能存在错误或导致您的问题的其他东西……)。 -
另外,您是否验证过,如果您取出 Pandas(可能只是传递一个相同大小的字典,而不是由该字典制成的 DataFrame)问题就消失了?特别是如果您使用的是 Python 3.2 或更早版本,其中多处理存在一些已修复的错误……
-
@abarnert - 他使用的是 python 2.7,所以
range(1000)将是一个包含 1000 个整数的列表,可能足以填充底层管道缓冲区。
标签: python queue multiprocessing dataframe python-multiprocessing