【问题标题】:multiprocessing pool hanging and unable to break out of app多处理池挂起,无法脱离应用程序
【发布时间】:2012-02-18 23:27:18
【问题描述】:

我确定这是一个新手错误,但我无法弄清楚我在多处理方面做错了什么。我有这段代码(只是闲置,什么都不做)

if __name__ == '__main__':
    pool = Pool(processes=4)  
    for i, x in enumerate(data): 
        pool.apply_async(new_awesome_function, (i, x))
    pool.close()
    pool.join()

data 是一个列表([1,2,3,4,5]),我正在尝试将列表发送到要在多个 cpu 上完成的每个项目,但是当我将我的工作命令包装到一个函数中并发送这段代码它没有做任何事情(当我在没有上面代码的情况下调用函数本身时它工作正常)。所以我认为我使用错误的多处理(虽然我从网站上举了例子),有什么建议吗?

更新:我注意到当它用 control-c 冻结时我什至无法摆脱它。它总是可以摆脱我的错误程序。我查看了python2.5 multiprocessing Pool 并尝试遵循建议并在我的 if 语句中添加了导入但没有运气

Update2:对不起,感谢下面的答案,该命令有效,但它似乎没有终止程序或让我强制退出。

【问题讨论】:

  • 您的问题可能出在my_awesome_function 我认为您需要向我们展示该功能。
  • 我刚刚发布了代码和预期的结果(在我添加多处理之前有效)
  • 它太奇怪了,通常我会收到一个错误,它不会做任何事情并挂起。起初它会运行但不会退出(这是一个简单的例子),但是在我把我的完整代码放在没有运行之后它仍然不让我退出。我尝试将打印语句全部用于调试,但没有一个被触发(即使是 if 语句之外的那些

标签: python multiprocessing


【解决方案1】:

多处理不是线程。

你可能正在做类似这样的事情

data = {}

def new_awesome_function(a, b):
    data[a] = b

运行脚本后,数据没有改变。这是因为多处理使用程序的副本。您的函数正在运行,但它们在您的程序的副本中运行,因此对您的原始程序没有影响。

为了使用多处理,您需要从一个进程显式通信到另一个进程。线程是共享的,但多处理是共享的,除非您明确共享它。

最简单的方法是使用返回值:

def new_awesome_function(a, b):
    return a + b

result = pool.apply_async(new_awesome_function, (1, 2))
# later...
value = result.get()

请参阅 python 文档:http://docs.python.org/library/multiprocessing.html,了解队列、管道和管理器等其他方法。你不能做的是改变你的程序状态并期望它能够工作。

【讨论】:

  • 很高兴知道,但我不这样做。我有一个完全独立的功能。它接受两个变量,如果变量满足某些条件,则将它们添加到数据库中。该函数的唯一外部是我要发送的列表。我可能把它弄得太复杂了,实际上我想要的只是一种将列表项(如上例中的值和计数)发送到函数并让该函数在使用我的 cpu 时完成其工作的方法。
【解决方案2】:

我不知道您使用的是什么数据库,但您可能无法像那样在进程之间共享数据库连接。

在 linux 上,使用fork(),它会在您启动子进程时复制内存中的所有内容。但是,除非专门设计,否则套接字、打开文件和数据库连接之类的东西将无法正常工作。

在 Window 上,fork() 不可用,因此它将重新运行您的脚本。在您的情况下,这将非常糟糕,因为它会再次丢弃所有内容。您可以通过放入 if __name__ == '__main__': 位来防止这种情况发生。

您应该能够重新打开my_awesome_function 中的数据库连接,从而能够成功地与数据库进行交互。

说实话,这样做你不会获得任何速度。事实上,我希望这会更慢。看到数据库真的很慢。您的进程将花费大部分时间等待数据库。现在您只有多个进程在等待数据库,这确实不会改善这种情况。

但是数据库是用来存储东西的。只要您正在进行处理,您就应该在访问数据库之前在代码中真正做到这一点。您基本上是在使用数据库作为一个集合,并且您的代码使用 python 集合会更好。如果您确实需要将这些内容放入数据库,请在程序结束时进行。

【讨论】:

  • 谢谢温斯顿。我这样做是因为当我将它存储在 python dict 中时,由于 dict 变得非常大,我会遇到内存错误。我现在使用 MySQL 来存储它
  • 顺便说一句,我没有使用 Windows,只有 linux 和 mac 桌面。我将尝试将连接语句放在函数中,并查看它让程序运行。此外,它以前可以工作,但作为单个进程非常非常慢(但没有内存错误),所以这就是我尝试使用多进程的原因,因为每个进程都是独立的,我认为我可以生成许多进程来加快速度。跨度>
  • @Lostsoul,数据库是您内存问题的错误解决方案。我建议您在 codereview.stackexchange.com 上发布数据库前版本并寻求帮助以减少内存使用。
  • 你给了我一个好主意。谢谢温斯顿。我想我正在共享 mysql 命令(我不认为它是共享的,因为它是静态的)。我正在重写我的代码,看看我是否可以将所有内容都包含在函数本身中。关于内存,codereview 无济于事,我使用的数据非常大,因此它创建的 dict 变得很大。在测试情况下它工作正常,但当我使用更大的数据集时它会失败(它超过 10+gig 表)。分发它也会很痛苦,所以我认为我需要持久存储
  • @Lostsoul,代码审查可能会有所帮助,因为有人可能会指出不需要在内存中保留尽可能多状态的解决方案,或者是访问额外空间的更有效方法。数据库非常重量级,几乎可以肯定存在更好的解决方案。
【解决方案3】:

您的代码似乎对我有用:

from multiprocessing import Pool
import time

def new_awesome_function(a,b):
    print(a,b, 'start')
    time.sleep(1)
    print(a,b, 'end')

if __name__ == '__main__':
    data = [1,2,3,4,5]
    pool = Pool(processes=4)
    for i, x in enumerate(data): 
        pool.apply_async(new_awesome_function, (i, x))
    pool.close()
    pool.join()

给了我:

0 1 start
1 2 start
2 3 start
3 4 start
1 2 end
0 1 end
4 5 start
2 3 end
3 4 end
4 5 end

是什么让你认为它不起作用?


编辑:尝试运行它并查看输出:

from multiprocessing import Pool
import time

def new_awesome_function(a,b):
    print(a,b, 'start')
    time.sleep(1)
    print(a,b, 'end')
    return a + b

if __name__ == '__main__':
    data = [1,2,3,4,5]
    pool = Pool(processes=4)
    results = []
        for i, x in enumerate(data): 
        r = pool.apply_async(new_awesome_function, (i, x))
        results.append((i,r))
    pool.close()
    already = []
    while len(already) < len(data):
        for i,r in results:
            if r.ready() and i not in already:
                already.append(i)
                print(i, 'is ready!')
    pool.join()

我的是:

0 1 start
1 2 start
2 3 start
3 4 start
0 1 end
4 5 start
1 2 end
2 3 end
0 is ready!
3 4 end
1 is ready!
2 is ready!
3 is ready!
4 5 end
4 is ready!

【讨论】:

  • 我检查了输出,你的权利它确实有效(数据出来了)。但是完成后它不会退出程序。它只是挂起。
  • @Lostsoul:我猜是join() 阻止了一切。我在python-2.7python-3.2 上没有这个问题,但我没有python-2.5,所以我无法对其进行测试。原因可能是async results 不可用或其他原因,尝试在new_awesome_function 中添加特定返回,并可能以某种方式检查它们是否为ready() 或类似的东西。
  • 嗯,你让我想到了..我实际上没有任何结果要显示,可能是这样吗?我正在运行一个命令来处理数据并将其上传到数据库。我正在使用 python 2.6,但我可以尝试升级并查看它是否有效。
  • @Lostsoul:那是完全不同的事情(而且更困难),您需要同步对资源的访问。无论如何,上面的代码不应该挂起,尝试运行我的新编辑。
  • 您认为需要同步哪些资源?
猜你喜欢
  • 2020-04-11
  • 2016-11-19
  • 2012-08-31
  • 2017-02-08
  • 2013-02-25
  • 2020-08-12
  • 2020-07-28
  • 2019-01-29
  • 2015-03-23
相关资源
最近更新 更多