【问题标题】:How to manage a pool of processes?如何管理进程池?
【发布时间】:2018-09-28 13:56:15
【问题描述】:

我正在尝试在 Windows 10 上设置多处理池。

基本上一些cpu(在我的例子中是12个)应该从Qin读取并将结果写入Qout。在Qin 中写入'end' 时,该进程应该停止。

由于某种原因,进程挂起。

我开发了一个简单的版本:

from multiprocessing import Pool, Queue, Event
import os,time


def worker( Qin, Qout, event):
    time.sleep(5)
    while True:
        item = Qin.get()
        if item == 'end':
            event.set()
        else:
            Qout.put(item)
        time.sleep(1)

def manager():
    Qin,Qout,event= Queue(), Queue(), Event()
    processes = os.cpu_count()
    pool = Pool(processes=processes)
    for _ in range(processes):
        pool.apply_async(worker,args= (Qin,Qout,event,))
    for i in range(100):
        print(i)
        Qin.put(i)

    Qin.put('end')

    pool.close()
    event.wait()
    pool.terminate()
    return Qout

Qout = manager()

【问题讨论】:

  • 如果您在 Windows 上,您需要检查 __main__,否则每个进程都会一次又一次地调用 manager... 请参阅 安全导入主模块spawn and forkserver start methods 文档中

标签: python windows events multiprocessing pool


【解决方案1】:

您需要了解异步编程在 python 中的正确工作方式。当您调用 apply_async 时,您将获得 Future 对象。 python中的Queue实现依赖于系统管道将数据从一个进程传输到另一个进程,并使用一些信号量来保护该管道上的读写。

from multiprocessing import Pool, Queue, Event
import os
import time
import multiprocessing

def worker( Qin, Qout, event):
    print('worker')
    time.sleep(1)
    event.set()

def manager():
    processes = multiprocessing.cpu_count()
    m = multiprocessing.Manager()
    Qin = m.Queue()
    Qout = m.Queue()
    event = m.Event()
    pool = Pool(processes=processes)
    result = pool.apply_async(worker, (Qin, Qout, event))
    result.get()
    pool.close()
    event.wait()
    return Qout

if __name__ == '__main__':
    Qout = manager()

【讨论】:

    【解决方案2】:

    我认为您的代码挂起的原因是因为所有工作任务最终都在等待输入队列中同时出现 item = Qin.get() 行,因为 get() “阻塞”等待放置一些东西在队列中。避免这种情况的一种方法是改用非阻塞get_nowait() 方法。这样做需要代码来处理它可能引发的任何Empty 异常,但它避免了该进程中的任何进一步执行在此时有效地停止。

    为了让事情正常工作,您需要创建和使用multiprocessing.Manager,它会创建一个服务器进程,该进程包含 Python 对象并允许其他进程通过代理来操作它们。请参阅文档Sharing state between processes 部分的“服务器进程”部分。

    另外,当在 Windows 上使用 multiprocessing 时,确保主进程的代码在 if __name__ == '__main__': 语句中被有条件地执行是非常重要的。这是因为该模块是如何在该平台上实现的 - 否则每次启动另一个并发任务时都会再次执行代码(这涉及到它被他们imported)。

    以下是您的代码,需要进行修改,因此它使用multiprocessing.Manager。请注意,我更改了 manager() 函数的名称,以避免与用于创建共享对象的 multiprocessing.Manager 混淆。

    import multiprocessing
    from queue import Empty as QueueEmpty
    import os
    import time
    
    END_MARKER = 'end'
    
    
    def worker(id, Qin, Qout, event):
        while True:
            try:
                item = Qin.get_nowait()  # Non-blocking.
            except QueueEmpty:
                if event.is_set():  # Last item seen?
                   break
                continue # Keep polling.
    
            if item == END_MARKER:  # Last item?
                event.set()
                break  # Quit.
    
            Qout.put('{} via worker {}'.format(item, id))
            time.sleep(.25)
    
    
    def pool_manager():
        processes = os.cpu_count()
        pool = multiprocessing.Pool(processes=processes)
        manager = multiprocessing.Manager()
        Qin, Qout, event = manager.Queue(), manager.Queue(), manager.Event()
    
        for i in range(100):
            Qin.put(i)
    
        Qin.put(END_MARKER)
    
        for id in range(processes):
            pool.apply_async(worker, (id, Qin, Qout, event))
    
        pool.close()  # Done adding tasks.
        pool.join()  # Wait for all tasks to complete.
    
        return Qout
    
    
    if __name__ == '__main__':
        print('Processing')
        Qout = pool_manager()
    
        print('Contents of Qout:')
        while not Qout.empty():
            item = Qout.get()
            print(' ', item)
    
        print('End of script')
    

    【讨论】:

      猜你喜欢
      • 2021-10-19
      • 1970-01-01
      • 2013-04-30
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-04-15
      相关资源
      最近更新 更多