【问题标题】:Python multiprocessing imap - discard timeout processesPython多处理imap - 丢弃超时进程
【发布时间】:2021-12-23 21:32:33
【问题描述】:

使用 Python 多处理我想捕获进程丢弃它们并继续下一个进程。

在下面的示例中,我有一个 1 和 0 的列表作为输入。 0 将启动睡眠功能以触发超时错误。触发超时的进程会重新执行,因此脚本将永远运行。

如何捕获 TimeOut 错误、终止导致该错误的进程并防止该进程重新执行?重要的是我可以使用 imap 做到这一点。

import time
import multiprocessing as mp

def a_func(x):
    print(x)
    if x:
        return x
    
    # Function sleeps before returning
    # to trigger timeout error
    else:
        time.sleep(2.0)
        return x


if __name__ == "__main__":
    solutions = []

    # Inputs sum to 4
    inputs = [1, 1, 0, 1, 1, 0]

    with mp.get_context("spawn").Pool(1) as pool:
        futures_res = pool.imap(a_func, inputs)
        idx = 0
        for s in (inputs):
            try:
                res = futures_res.next(timeout=0.1)
                # If successful (no time out), append the result
                solutions.append(res)
            
            except mp.context.TimeoutError:
                print(s, "err")
                # Catch time out error
                # I want this to also prevent the process from being executed again
                # solutions.append(0.0)

    # Should print 4
    print(len(solutions))
    print(solutions)

【问题讨论】:

    标签: python python-3.x multiprocessing


    【解决方案1】:

    您可能对imap 如何处理超时感到有些困惑,或者您没有清楚地表达您的问题,或者我感到困惑。所以让我们从顶部开始:

    为了确定当您对imap 返回的迭代器执行next(timeout=some_value) 时是否会引发multiprocessing.TimeoutError 异常,计时开始于任务被进程从队列中取出以执行。因此,如果池中只有一个进程并提交了 6 个任务,则不会执行并行处理,例如,第三个任务将在第二个任务完成之前不会启动,即第三个任务的计时开始而不是从提交所有任务开始。

    但是当你得到一个超时异常时,正在执行的任务实际上并没有发生任何事情——它会继续执行。您仅从 imap 迭代返回值 6 次。但是如果你无限地迭代直到你得到一个StopIteration 异常,你最终会看到所有任务最终都完成并返回了一个值,可能会在此过程中抛出多个超时错误。

    一种解决方案是继续从inputs 列表中删除与您正在迭代其结果的任务相对应的输入值,但是一旦您遇到超时异常,您就会终止池中的剩余任务(如果有的话) inputs 列表中仍保留的任何输入,使用新的 inputs 列表重新运行 imap

    三点:当你终止池时,池中的进程可能已经开始执行输入队列上的下一个任务。所以这需要是一个可重新启动的任务。您还需要将输入列表的副本传递给imap,因为imap“lazily”评估了pasaed iterable,并且您将在迭代imap和@的返回值时修改inputs列表如果您没有通过副本,否则 987654335@ 仍会评估 inputs。您应该传递比 .1 稍大的超时值,因为在我的桌面上,即使将值 1 传递给工作函数,我仍然不时遇到超时异常。

    import time
    import multiprocessing as mp
    
    def a_func(x):
        print(x)
        if x:
            return x
    
        # Function sleeps before returning
        # to trigger timeout error
        else:
            time.sleep(2.0)
            return x
    
    
    if __name__ == "__main__":
        solutions = []
    
        # Inputs sum to 4
        inputs = [1, 1, 0, 1, 1, 0]
    
        while inputs:
            with mp.get_context("spawn").Pool(1) as pool:
                futures_res = pool.imap(a_func, inputs.copy())
                while inputs:
                    s = inputs.pop(0)
                    try:
                        res = futures_res.next(timeout=.5)
                        # If successful (no time out), append the result
                        solutions.append(res)
                    except mp.context.TimeoutError:
                        print(s, "err")
                        break
    
        # Should print 4
        print(len(solutions))
        print(solutions)
    

    打印:

    1
    1
    0
    0 err
    1
    1
    0
    0 err
    4
    [1, 1, 1, 1]
    

    【讨论】:

    • 谢谢,这已经为我解决了。您的解决方案看起来很适合保持剩余任务运行,同时删除导致我出现问题的任务!
    猜你喜欢
    • 2017-04-18
    • 2014-11-21
    • 1970-01-01
    • 1970-01-01
    • 2018-10-25
    • 2021-05-22
    • 2016-12-07
    • 2017-04-09
    • 1970-01-01
    相关资源
    最近更新 更多