我不知道 Java 的 FixedThreadPool,但我可以修复你的代码 ;-)
你显然不想使用res.get(),对吧?所以我会忽略那部分。 .apply_async() 的问题在于您没有正确调用它。我很惊讶没有提出任何例外!参数列表应该是一个元组,而不是一个列表(对于内置的 apply() 函数)。对于关键字参数参数,None 不起作用。如果您没有要传递的关键字参数,请将其省略(如下所示)或传递一个空字典 ({})。
此处的其他更改更具装饰性:引入了 IO 锁以防止终端输出被打乱,并引入了__name__ == "__main__" 检查以确保清晰,以便代码也可以在 Windows 上运行:
import multiprocessing, time
def getlock(lck):
global iolock
iolock = lck
def printStuff(number):
with iolock:
print number
if number % 2:
time.sleep(0.5)
return number*number
def execute():
def resultAggregator(n):
with iolock:
print 'aggregator called...'
a.append(n)
for i in range(34):
pool.apply_async(printStuff, (i,), callback=resultAggregator)
with iolock:
print "called for ", i
if __name__ == "__main__":
a = []
iolock = multiprocessing.Lock()
pool = multiprocessing.Pool(5, getlock, (iolock,))
execute()
pool.close()
pool.join()
print a
后来:错误
如果您为关键字参数传递None,实际上会引发异常 - 但multiprocessing 会抑制它。唉,这是异步噱头的一个常见问题:没有引发异常的好方法!它们发生在与您的“主程序”当时正在做什么无关的上下文中。
至少 Python 3.3.2 的 .apply_async() 实现也有一个可选的 error_callback 参数。不知道什么时候介绍的。如果您提供它,异步异常将传递给它,因此您可以决定如何报告(或记录,或忽略......)它们。添加此功能:
def ouch(e):
raise e
并将调用更改为:
pool.apply_async(printStuff, (i,), None, resultAggregator, ouch)
产生以 ouch() 结尾的回溯,并带有以下异常详细信息:
TypeError: printStuff() argument after ** must be a mapping, not NoneType
因此,至少,使用足够新的 Python,您可以安排不让异步错误无形地传递。
问答
您能解释一下 getLock() 中的“全局 iolock”声明吗?我以为
为每个子进程定义一个全局变量,但将名称从
iolock 到 iiolock 中? "main" 使工作进程不知道 iolock。
抱歉,我无法从那个确切看出你做了什么。名称iolock 旨在成为所有 进程、主进程和子进程中的全局变量。那是因为我的代码中的所有进程使用名称iolock。
例如,如果“通过更改名称……”您的意思是您只是替换了
iolock = multiprocessing.Lock()
到
iiolock = multiprocessing.Lock()
那么你会得到一个例外:
Traceback (most recent call last):
...
pool = multiprocessing.Pool(5, getlock, (iolock,))
NameError: global name 'iolock' is not defined
如果您也将那行 (pool = ...) 也更改为使用 iiolock,那么当主进程中的 resultAggregator 尝试使用 iolock 时,您会遇到不同的异常:
Exception in thread Thread-3:
Traceback (most recent call last):
...
File "mpool.py", line 19, in resultAggregator
with iolock:
NameError: global name 'iolock' is not defined
所以我不知道你到底做了什么。
在执行中声明 printStuff 也会导致静默
错误(代码在打印“要求”之后没有进展)
那是行不通的。 Python 中的函数未声明 - def 是一个可执行 语句。在执行def printstuff 之前,printStuff 的代码不存在。因为只有主程序执行execute(),所以def'ed在execute()里面的函数只存在于主程序中。这是真的
pool.apply_async(printStuff, (i,), callback=resultAggregator)
传递 printStuff 给子进程,但是所有传递的东西都通过发送端的酸洗和接收端的解酸来工作,并且函数对象不能被酸洗。你确定没有收到这样的错误吗?:
_pickle.PicklingError: Can't pickle <class 'function'>: attribute lookup builtins.function failed
(我在这里使用的是 Python 3 - 也许它在 Python 2 下有所不同)。
无论如何,Python 不是 Java - 不要疯狂地嵌套 - 保持简单 ;-) 子进程使用的每个函数和类都应该在模块级别定义(class 也是可执行文件Python 中的语句!Python 中唯一的“声明”是 global 和 nonlocal 语句)。
更多问答
你的假设是对的。我全部更改为 iiolock
主要以外的地方
仍然不知道究竟你做了什么。对于这样的事情,你真的必须发布代码,而不仅仅是描述你做了什么。我只能猜测 - 这真的很痛苦 ;-) 这个怎么样:如果你有一个新问题,打开一个新问题?
根据您在此处描述的内容(“在除 main 之外的所有位置”),您会在 execute() 中遇到异常,因为新的 iiolock 名称不会存在于主进程中(这是唯一的进程) execute() 运行 - 你说你确实没有改变了 main() 中的旧 iolock)。但是你没有提到一个例外,所以我猜你并没有真正做到你所说的那样(“在除 main 之外的所有地方”)。
并期望新进程获得相同的锁
作为参数传递给初始化函数,但每个都有它的
自己的全局 iiolock 变量。多个进程如何共享
无论如何都是相同的变量(内存内容是否不同
每个进程???)。
对此有两个答案 ;-) 最直接相关的一个是 iolock(在我的原始代码中 - 我真的不知道你的代码现在是什么样子)是由 multiprocessing 创建的对象(它是mp.Lock()) 并通过 mp.Pool() 传递给子进程:
pool = multiprocessing.Pool(5, getlock, (iolock,))
^^^^^^
mp 控制着这里的一切,并在幕后做一个世界性的事情,以确保这个mp.Lock() 在进程之间具有一致的状态。这不仅仅是任何旧变量,它是 mp 了解的所有内容以及 mp 实现的所有行为。
第二个答案,到目前为止不适用于本期的任何代码,您还可以使用mp 在“共享内存”中创建某些类型的数据。请参阅 mp.Value 和 mp.Array 和 multiprocessing.sharedctypes 的文档。这些值是真正(物理上)跨进程共享的。
但除了那些(由mp实现的对象,以及从mp获得的“共享内存”)之外,你是对的:没有其他值是共享的(无论是物理上的还是语义上的)跨进程。通过在不同的mp 同步点(如.put() mp.Queue 上的对象和另一个处理.get()s)。