【问题标题】:python multiprocessing apply_async only uses one processpython multiprocessing apply_async 只使用一个进程
【发布时间】:2012-09-18 19:16:40
【问题描述】:

我有一个脚本,其中包括从列表中打开一个文件,然后对该文件中的文本执行某些操作。我正在使用 python 多处理和 Pool 来尝试并行化此操作。脚本的抽象如下:

import os
from multiprocessing import Pool

results = []
def testFunc(files):
    for file in files:
        print "Working in Process #%d" % (os.getpid())
        #This is just an illustration of some logic. This is not what I'm actually doing.
        for line in file:
            if 'dog' in line:
                results.append(line)

if __name__=="__main__":
    p = Pool(processes=2)
    files = ['/path/to/file1.txt', '/path/to/file2.txt']
    results = p.apply_async(testFunc, args = (files,))
    results2 = results.get()

当我运行此程序时,每次迭代的进程 ID 打印输出都是相同的。基本上我要做的是获取输入列表的每个元素并将其分叉到一个单独的进程中,但似乎一个进程正在完成所有工作。

【问题讨论】:

    标签: python multiprocessing


    【解决方案1】:
    • apply_async 将一项任务分配给池。你需要打电话 apply_async 多次锻炼更多的处理器。
    • 不允许两个进程尝试写入同一个列表, results。由于池工作者是独立的进程,这两个 不会写入同一个列表。解决此问题的一种方法是使用输出队列。您可以自己设置,或使用apply_async 的回调为您设置队列。 apply_async 将在函数完成后调用回调。
    • 您可以使用map_async 而不是apply_async,但是您会 获取列表列表,然后您必须将其展平。

    所以,不妨试试类似的方法:

    import os
    import multiprocessing as mp
    
    results = []   
    
    def testFunc(file):
        result = []
        print "Working in Process #%d" % (os.getpid())
        # This is just an illustration of some logic. This is not what I'm
        # actually doing.
        with open(file, 'r') as f:
            for line in f:
                if 'dog' in line:
                    result.append(line)
        return result
    
    
    def collect_results(result):
        results.extend(result)
    
    if __name__ == "__main__":
        p = mp.Pool(processes=2)
        files = ['/path/to/file1.txt', '/path/to/file2.txt']
        for f in files:
            p.apply_async(testFunc, args=(f, ), callback=collect_results)
        p.close()
        p.join()
        print(results)
    

    【讨论】:

    • 调用 p.join() 后没有任何反应。
    【解决方案2】:

    也许在这种情况下你应该使用map_async:

    import os
    from multiprocessing import Pool
    
    results = []
    def testFunc(file):
        message =  ("Working in Process #%d" % (os.getpid()))
        #This is just an illustration of some logic. This is not what I'm actually doing.
        for line in file:
            if 'dog' in line:
                results.append(line)
        return message
    
    if __name__=="__main__":
        print("saddsf")
        p = Pool(processes=2)
        files = ['/path/to/file1.txt', '/path/to/file2.txt']
        results = p.map_async(testFunc, files)
        print(results.get())
    

    【讨论】:

    • 如果你马上要去results.get(),也许只是map
    • 我很欣赏这个答案,但出于各种原因我试图坚持使用 apply_async。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-08-20
    • 2018-10-09
    • 2013-01-04
    相关资源
    最近更新 更多