【问题标题】:Weird behaviour with numpy and multiprocessing.processnumpy 和 multiprocessing.process 的奇怪行为
【发布时间】:2018-08-14 18:40:09
【问题描述】:

抱歉,代码太长了,我已尝试使其尽可能简单且可重现。

简而言之,这个 python 脚本启动了四个将数字随机分配到列表中的进程。然后,将结果添加到multiprocessing.Queue()

import random
import multiprocessing
import numpy
import sys

def work(subarray, queue):
    result = [numpy.array([], dtype=numpy.uint64) for i in range (0, 4)]

    for element in numpy.nditer(subarray):
        index = random.randint(0, 3)
        result[index] = numpy.append(result[index], element)

    queue.put(result)
    print "after the queue.put"

jobs = []
queue = multiprocessing.Queue()

subarray = numpy.array_split(numpy.arange(1, 10001, dtype=numpy.uint64), 4)

for i in range(0, 4):
    process = multiprocessing.Process(target=work, args=(subarray[i], queue))
    jobs.append(process)
    process.start()

for j in jobs:
    j.join()

print "the end"

所有进程都运行print "after the queue.put" 行。但是,它没有到达print "the end" 行。很奇怪,如果我将arange10001 更改为1001,它就会结束。发生了什么?

【问题讨论】:

  • 我可以在 python3.6 上重新创建这种行为。我也不知道发生了什么。这种行为非常奇怪。一个简单的解决方法是在最后从队列中收集结果而不是加入。
  • @Jannick 我没有得到“最后而不是加入”。你能描述一下那部分吗?

标签: python-2.7 numpy multiple-processes


【解决方案1】:

大多数子进程在 put 调用时被阻塞。 multiprocessing queue put

必要时阻止,直到有空闲插槽可用。

这可以通过在加入前添加对 queue.get() 的调用来避免。

另外,在多处理代码中,请通过以下方式隔离父进程:

if __name__ == '__main__':
    # main code here

Compulsory usage of if name==“main” in windows while using multiprocessing

【讨论】:

  • 我认为只有在队列中设置了最大大小时才会阻塞,这里不是这种情况。
【解决方案2】:

我会将我的评论扩展为一个简短的答案。由于我也不理解奇怪的行为,它只是一种解决方法。

第一个观察是,如果 queue.put 行被注释掉,代码会运行到最后,所以它一定是与队列有关的问题。结果实际上是添加到队列中的,所以问题一定出在队列和连接之间的相互作用上。

以下代码按预期工作

import random
import multiprocessing
import numpy
import sys
import time

def work(subarray, queue):
    result = [numpy.array([], dtype=numpy.uint64) for i in range (4)]

    for element in numpy.nditer(subarray):
        index = random.randint(0, 3)
        result[index] = numpy.append(result[index], element)

    queue.put(result)
    print("after the queue.put")


jobs = []
queue = multiprocessing.Queue()

subarray = numpy.array_split(numpy.arange(1, 15001, dtype=numpy.uint64), 4)


for i in range(4):
    process = multiprocessing.Process(target=work, args=(subarray[i], queue))
    jobs.append(process)
    process.start()

res = []
while len(res)<4:
    res.append(queue.get())

print("the end")

【讨论】:

    【解决方案3】:

    这就是原因:

    Joining processes that use queues

    请记住,将项目放入队列的进程将在终止之前等待,直到所有缓冲的项目都由“馈送”线程馈送到底层管道。 (子进程可以调用队列的cancel_join_thread()方法来避免这种行为。)

    这意味着每当您使用队列时,您需要确保所有已放入队列的项目最终都会在进程加入之前被移除。否则,您无法确定将项目放入队列的进程将终止。另请记住,非守护进程将自动加入。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-11-27
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-01-14
      相关资源
      最近更新 更多