【问题标题】:How to implement Java FixedThreadPool using Python multiprocessing.Pool如何使用 Python multiprocessing.Pool 实现 Java FixedThreadPool
【发布时间】:2013-10-11 17:25:27
【问题描述】:

我需要在 Python 中使用进程池。要求如下:

池的大小固定为 10。我有许多作业要提交到池 (N > 10)。 在 Java 中,可以为此目的使用 FixedThreadPool。提交作业,一旦线程完成执行任务,客户端就可以提交下一个任务。所以如果当前有 10 个任务在运行,客户端无法提交第 11 个任务。但如果一个任务完成,客户端就可以将下一个任务提交给可用线程。

这是我用来测试一些想法的代码:

import multiprocessing, time


def printStuff(number):
    print number
    if number % 2 : time.sleep(0.5)
    return number*number

pool = multiprocessing.Pool(5, None, None, None)   
a = []

def execute():
    def resultAggregator(n):
        print 'aggregator called...'
        a.append(n)
    for i in range (0, 34):

        # With callback
        #pool.apply_async(printStuff, [i], None, resultAggregator)
        #print "called for ", i

        # Without callback
        res = pool.apply_async(printStuff, [i])
        print "called for" , i, "returned ", res.get()

    pool.close() # disable sumitting any more tasks
    pool.join() # wait for all the worker to finish

execute()
print a

res.get() 阻塞直到printStuff 返回。使用回调变体甚至不会调用printStuff。注意在这两种情况下a最后都是空的。

任何想法如何实现上述行为?代码 sn-ps 会很棒,但它足以指向一个我不知道的现有库函数的指针,或者只是折腾一些想法。

【问题讨论】:

    标签: python python-2.7 multiprocessing


    【解决方案1】:

    我不知道 Java 的 FixedThreadPool,但我可以修复你的代码 ;-)

    你显然不想使用res.get(),对吧?所以我会忽略那部分。 .apply_async() 的问题在于您没有正确调用它。我很惊讶没有提出任何例外!参数列表应该是一个元组,而不是一个列表(对于内置的 apply() 函数)。对于关键字参数参数,None 不起作用。如果您没有要传递的关键字参数,请将其省略(如下所示)或传递一个空字典 ({})。

    此处的其他更改更具装饰性:引入了 IO 锁以防止终端输出被打乱,并引入了__name__ == "__main__" 检查以确保清晰,以便代码也可以在 Windows 上运行:

    import multiprocessing, time
    
    def getlock(lck):
        global iolock
        iolock = lck
    
    def printStuff(number):
        with iolock:
            print number
        if number % 2:
            time.sleep(0.5)
        return number*number
    
    def execute():
        def resultAggregator(n):
            with iolock:
                print 'aggregator called...'
            a.append(n)
    
        for i in range(34):
            pool.apply_async(printStuff, (i,), callback=resultAggregator)
            with iolock:
                print "called for ", i
    
    if __name__ == "__main__":
        a = []
        iolock = multiprocessing.Lock()
        pool = multiprocessing.Pool(5, getlock, (iolock,))   
        execute()
        pool.close()
        pool.join()
        print a
    

    后来:错误

    如果您为关键字参数传递None,实际上会引发异常 - 但multiprocessing 会抑制它。唉,这是异步噱头的一个常见问题:没有引发异常的好方法!它们发生在与您的“主程序”当时正在做什么无关的上下文中。

    至少 Python 3.3.2 的 .apply_async() 实现也有一个可选的 error_callback 参数。不知道什么时候介绍的。如果您提供它,异步异常将传递给它,因此您可以决定如何报告(或记录,或忽略......)它们。添加此功能:

    def ouch(e):
        raise e
    

    并将调用更改为:

    pool.apply_async(printStuff, (i,), None, resultAggregator, ouch)
    

    产生以 ouch() 结尾的回溯,并带有以下异常详细信息:

    TypeError: printStuff() argument after ** must be a mapping, not NoneType
    

    因此,至少,使用足够新的 Python,您可以安排不让异步错误无形地传递。

    问答

    您能解释一下 getLock() 中的“全局 iolock”声明吗?我以为 为每个子进程定义一个全局变量,但将名称从 iolock 到 iiolock 中? "main" 使工作进程不知道 iolock。

    抱歉,我无法从那个确切看出你做了什么。名称iolock 旨在成为所有 进程、主进程和子进程中的全局变量。那是因为我的代码中的所有进程使用名称iolock

    例如,如果“通过更改名称……”您的意思是您只是替换了

    iolock = multiprocessing.Lock()
    

    iiolock = multiprocessing.Lock()
    

    那么你会得到一个例外:

    Traceback (most recent call last):
      ...
        pool = multiprocessing.Pool(5, getlock, (iolock,))
    NameError: global name 'iolock' is not defined
    

    如果您也将那行 (pool = ...) 也更改为使用 iiolock,那么当主进程中的 resultAggregator 尝试使用 iolock 时,您会遇到不同的异常:

    Exception in thread Thread-3:
    Traceback (most recent call last):
      ...
      File "mpool.py", line 19, in resultAggregator
        with iolock:
    NameError: global name 'iolock' is not defined
    

    所以我不知道你到底做了什么。

    在执行中声明 printStuff 也会导致静默 错误(代码在打印“要求”之后没有进展)

    那是行不通的。 Python 中的函数未声明 - def 是一个可执行 语句。在执行def printstuff 之前,printStuff 的代码不存在。因为只有主程序执行execute(),所以def'ed在execute()里面的函数只存在于主程序中。这是真的

        pool.apply_async(printStuff, (i,), callback=resultAggregator)
    

    传递 printStuff 给子进程,但是所有传递的东西都通过发送端的酸洗和接收端的解酸来工作,并且函数对象不能被酸洗。你确定没有收到这样的错误吗?:

    _pickle.PicklingError: Can't pickle <class 'function'>: attribute lookup builtins.function failed
    

    (我在这里使用的是 Python 3 - 也许它在 Python 2 下有所不同)。

    无论如何,Python 不是 Java - 不要疯狂地嵌套 - 保持简单 ;-) 子进程使用的每个函数和类都应该在模块级别定义(class 也是可执行文件Python 中的语句!Python 中唯一的“声明”是 globalnonlocal 语句)。

    更多问答

    你的假设是对的。我全部更改为 iiolock 主要以外的地方

    仍然不知道究竟你做了什么。对于这样的事情,你真的必须发布代码,而不仅仅是描述你做了什么。我只能猜测 - 这真的很痛苦 ;-) 这个怎么样:如果你有一个新问题,打开一个新问题?

    根据您在此处描述的内容(“在除 main 之外的所有位置”),您会在 execute() 中遇到异常,因为新的 iiolock 名称不会存在于主进程中(这是唯一的进程) execute() 运行 - 你说你确实没有改变了 main() 中的旧 iolock)。但是你没有提到一个例外,所以我你并没有真正做到你所说的那样(“在除 main 之外的所有地方”)。

    并期望新进程获得相同的锁 作为参数传递给初始化函数,但每个都有它的 自己的全局 iiolock 变量。多个进程如何共享 无论如何都是相同的变量(内存内容是否不同 每个进程???)。

    对此有两个答案 ;-) 最直接相关的一个是 iolock(在我的原始代码中 - 我真的不知道你的代码现在是什么样子)是由 multiprocessing 创建的对象(它是mp.Lock()) 并通过 mp.Pool() 传递给子进程:

    pool = multiprocessing.Pool(5, getlock, (iolock,)) 
                                             ^^^^^^
    

    mp 控制着这里的一切,并在幕后做一个世界性的事情,以确保这个mp.Lock() 在进程之间具有一致的状态。这不仅仅是任何旧变量,它是 mp 了解的所有内容以及 mp 实现的所有行为。

    第二个答案,到目前为止不适用于本期的任何代码,您还可以使用mp 在“共享内存”中创建某些类型的数据。请参阅 mp.Valuemp.Arraymultiprocessing.sharedctypes 的文档。这些值是真正(物理上)跨进程共享的。

    但除了那些(mp实现的对象,以及从mp获得的“共享内存”)之外,你是对的:没有其他值是共享的(无论是物理上的还是语义上的)跨进程。通过在不同的mp 同步点(如.put() mp.Queue 上的对象和另一个处理.get()s)。

    【讨论】:

    • 完美答案!它似乎完全符合我的要求。
    • @Eugen,你已经非常接近了!在列表(而不是元组)中传递 args 确实有效,但由于没有记录(除了它出现在某些示例中),最好坚持使用元组拼写。您的代码中唯一的“真正”问题是通过 None :-)
    • 您能解释一下 getLock() 中的“全局 iolock”声明吗?我认为它为每个子进程定义了一个全局变量,但是在“main”中将名称从 iolock 更改为 iiolock 会使工作进程不知道 iolock。同样在执行中声明 printStuff 会导致静默错误(代码不会在打印“要求”之后进行)
    • @Eugen,请查看我刚才的编辑 - 无法发表评论。
    • Progress ;-) printStuff() 不必传递,因为它是在模块级别定义的 - 每个 进程都会自动看到它。在 Linux-y 系统上,因为它在 mp 分叉时已经定义,而在 Windows 上,因为每个子进程都将“主程序”作为模块导入(因此重新执行模块中的所有代码)。同样execute() 对所有进程已知,但只有主程序实际调用它。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-07-09
    • 2015-12-23
    • 1970-01-01
    • 2010-09-24
    • 2011-06-10
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多