【问题标题】:Return result of process after an exception is triggered异常触发后进程返回结果
【发布时间】:2021-09-27 13:59:52
【问题描述】:

我有一个多处理设置,通过将所有计算值附加到lst 来处理长时间运行的任务。大致是这样的:

from multiprocessing import Pool
from time import sleep


def fun(_):
    lst = []  # list that will be returned
    for i in range(200):
        lst.append(i)
        if not i % 10:
            sleep(0.1)  # 'long task', cause a KeyboardInterrupt in this time
    return lst


if __name__ == '__main__':
    master = []
    processes = 2
    for result in Pool(processes).imap_unordered(fun, range(processes)):
        master.append(result)
    print(master)

我希望能够引起KeyboardInterrupt 并让进程返回他们处理的列表,即使它们尚未完成,因为每次迭代只是添加一个新的子列表。 (我的实际数据大致类似于lst = ([], [[], ...], [[], ...]),每个空列表只包含整数,实际函数为return lst1, lst2, lst3

我试图将整个主要部分包含在try: except: 中,如下所示:

try:
    for result in Pool(processes).imap_unordered(fun, range(processes)):
        master.append(result)
except KeyboardInterrupt:
    # somehow retrieve the values here
    pass

但是,我还没有以这种方式找到任何可能的解决方案。 我如何告诉进程是时候提前退出并返回他们当前的结果了?

编辑以显示实际结构: main.py:


from other import Other

class Something:
    def __init__(self):
        pass  # stuff here
    
    def spawner(self):
        for result in Pool(processes=self.processes).imap_unordered(self.loop, range(self.processes)):
            pass  # do stuff with the data

    def loop(self, _):
        # setup stuff
        Other(setup_stuff).start()

其他.py


class Other:
    def __init__(self):
        pass  # more stuff

    def start(self):
        lst1, lst2, lst3 = [], [], []
        for _ in range(self.episodes):
            pass  # do the actual computation
        return lst1, lst2, lst3

【问题讨论】:

    标签: python exception multiprocessing


    【解决方案1】:

    也许您可以使用multiprocessing.Queue 而不是list 来返回变量。一开始设置一个队列,所有进程都会写入队列。

    最后,从队列中读取所有值。

    from time import sleep
    from multiprocessing import Pool, Queue
    
    q = None
    
    
    def set_global_data(queue):
        global q
        q = queue
    
    
    def fun(_):
        for i in range(200):
            q.put_nowait(i)
            if not i % 10:
                sleep(0.1)  # 'long task', cause a KeyboardInterrupt in this time
        # nothing is returned
    
    
    if __name__ == "__main__":
        master = Queue()
        processes = 2
    
        try:
            with Pool(processes, set_global_data, (master,)) as p:
                for result in p.imap_unordered(fun, range(processes)):
                    pass
        except KeyboardInterrupt:
            pass
    
        while not master.empty():
            v = master.get_nowait()
            print(v)
    

    编辑:有多个文件:

    main.py

    from other import Other
    from multiprocessing import Pool, Queue
    
    
    class Something:
        def __init__(self):
            pass  # stuff here
    
        def spawner(self):
            master = Queue()
    
            try:
                with Pool(2, Something.set_global_data, (master,)) as p:
                    for _ in p.imap_unordered(self.loop, range(2)):
                        pass
            except KeyboardInterrupt:
                pass
    
            while not master.empty():
                v = master.get_nowait()
                print(v)
    
        def loop(self, _):
            # setup stuff
            Other().start()
    
        @staticmethod
        def set_global_data(queue):
            Other.q = queue
    
    
    s = Something()
    s.spawner()
    

    其他.py

    from time import sleep
    
    
    class Other:
        q = None
    
        def __init__(self):
            pass  # more stuff
    
        def start(self):
            for i in range(200):
                Other.q.put_nowait(i)
                if not i % 10:
                    sleep(0.1)
    

    【讨论】:

    • 这似乎有效,但是我没有描述实际计算发生在不同的文件中,所以global q 不起作用。我将编辑我的问题。
    • @ArcKoor 查看我的编辑。
    • 干得好,非常感谢!
    • @AndrejKesely 从with Pool 上下文中的队列中获取所有结果的最佳实践,而不是信任queue.empty()。否则可能会发生死锁或丢失数据。此外,“main.py”示例缺少使用“spawn”上下文运行的导入保护。
    猜你喜欢
    • 1970-01-01
    • 2013-12-16
    • 2021-02-27
    • 1970-01-01
    • 1970-01-01
    • 2015-11-21
    • 1970-01-01
    • 2015-06-16
    • 2011-04-17
    相关资源
    最近更新 更多