【问题标题】:Python - weird behavior with multiprocessing - join does not executePython - 多处理的奇怪行为 - 连接不执行
【发布时间】:2015-06-30 20:20:17
【问题描述】:

我正在使用multiprocessing python 模块。我有大约 20-25 个任务要同时运行。每个任务将创建一个约 20k 行的 pandas.DataFrame 对象。问题是,所有任务都执行得很好,但是当涉及到“加入”进程时,它就停止了。我已经尝试过使用“小型”DataFrame,它运行良好。为了说明我的观点,我创建了下面的代码。

import pandas
import multiprocessing as mp

def task(arg, queue):
    DF = pandas.DataFrame({"hello":range(10)}) # try range(1000) or range(10000)
    queue.put(DF)
    print("DF %d stored" %arg)

listArgs = range(20)
queue = mp.Queue()
processes = [mp.Process(target=task,args=(arg,queue)) for arg in listArgs]

for p in processes:
    p.start()

for i,p in enumerate(processes):
    print("joining %d" %i)
    p.join()

results = [queue.get() for p in processes]

编辑:

使用 DF = pandas.DataFrame({"hello":range(10)}) 我一切都正确:“存储 DF 0”到“存储 DF 19”,与“加入”相同0”到“加入19”。

但是 DF = pandas.DataFrame({"hello":range(1000)}) 会出现问题:在存储 DF 时,加入步骤在“加入 3”之后停止。

感谢有用的提示:)

【问题讨论】:

  • 那么输出是什么样子的呢?它是否说“存储 DF 0”到“存储 DF 19”(以任意顺序),并在某处混合了“加入 0”?还是有什么不同? (如果问题也仅发生在 3 个进程中,请考虑这样做,这样您就可以粘贴整个输出而不会压倒任何人……)
  • 您在运行代码时查看过系统资源利用率吗?如果是内存问题,请查看stackoverflow.com/questions/8956832/…
  • 另外,我不知道 Pandas 是否可以使用 NumPy 的多处理共享数组(google numpy-sharedmem),但如果可以,那可能比 pickling 20 更有效巨大的帧并将它们通过管道传递(这是Queue 在幕后所做的——熊猫的pickler 中可能存在错误或导致您的问题的其他东西……)。
  • 另外,您是否验证过,如果您取出 Pandas(可能只是传递一个相同大小的字典,而不是由该字典制成的 DataFrame)问题就消失了?特别是如果您使用的是 Python 3.2 或更早版本,其中多处理存在一些已修复的错误……
  • @abarnert - 他使用的是 python 2.7,所以range(1000) 将是一个包含 1000 个整数的列表,可能足以填充底层管道缓冲区。

标签: python queue multiprocessing dataframe python-multiprocessing


【解决方案1】:

这个问题在文档中解释,Pipes and Queues:

警告: 如上所述,如果子进程已将项目放入队列(并且尚未使用 JoinableQueue.cancel_join_thread),则该进程将不会终止,直到所有缓冲的项目都被刷新到管道。

这意味着如果您尝试加入该进程,您可能会遇到死锁,除非您确定已放入队列的所有项目都已被消耗。同样,如果子进程是非守护进程,则父进程在尝试加入其所有非守护子进程时可能会在退出时挂起。

请注意,使用管理器创建的队列不存在此问题。见Programming guidelines

使用管理器会起作用,但有很多更简单的方法可以解决这个问题:

  1. 首先从队列中读取数据,然后加入进程,而不是相反。
  2. 手动管理Queue(例如,使用JoinableQueuetask_done)。
  3. 只需使用Pool.map 而不是重新发明轮子。 (是的,Pool 所做的大部分工作对于您的用例来说并不是必需的,但它也不会妨碍您,而且好消息是,您已经知道它有效。)

我不会展示 #1 的实现,因为它太微不足道了,也不会展示 #2 的实现,因为它太痛苦了,但是对于 #3:

def task(arg):
    DF = pandas.DataFrame({"hello":range(1000)}) # try range(1000) or range(10000)
    return DF

with mp.Pool(processes=20) as p:
    results = p.map(task, range(20), chunksize=1)

(在 2.7 中,Pool 可能在with 语句中不起作用;您可以将更高版本的multiprocessing 的端口安装回 2.7 关闭 PyPI,或者您可以手动创建池,然后 @ 987654333@ 它在 try/finally 中,如果它在 with 语句中不起作用,您将处理一个文件......)


您可能会问自己,为什么它在这一点上会失败,但使用较小的数字 - 甚至只是小一点?

该 DataFrame 的 pickle 刚刚超过 16K。 (列表本身要小一些,但如果您尝试使用 10000 而不是 1000,您应该会在没有 Pandas 的情况下看到同样的结果。)

所以,第一个孩子写入 16K,然后阻塞,直到有空间写入最后几百字节。但是在join 之后,您不会从管道中拉出任何东西(通过调用queue.get),并且在他们退出之前您不能join,直到您解除管道阻塞,他们才能这样做,所以它是经典的僵局。前4个有足够的空间通过,但5个没有空间。因为你有4个核心,大多数时候,前4个通过将是前4个。但偶尔#4会击败#3或什么的,然后您将无法加入#3。这种情况在 8 核机器上会更常见。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2014-10-09
    • 1970-01-01
    • 2021-05-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多