【问题标题】:Python多处理跳过子段错误
【发布时间】:2022-01-18 05:41:15
【问题描述】:

我正在尝试对可能返回段错误的函数使用多处理(我无法控制此 ATM)。在子进程遇到段错误的情况下,我只希望该子进程失败,但所有其他子任务继续/返回其结果。

我已经从 multiprocessing.Pool 切换到 concurrent.futures.ProcessPoolExecutor 避免了子进程永远挂起(或直到任意超时)的问题,如此错误中所述:https://bugs.python.org/issue22393

但是我现在面临的问题是,当第一个子任务遇到段错误时,所有运行中的子进程都被标记为损坏 (concurrent.futures.process.BrokenProcessPool)。

有没有办法只将实际损坏的子进程标记为损坏?

我在Python 3.7.4中运行的代码:

import concurrent.futures
import ctypes
from time import sleep


def do_something(x):
    print(f"{x}; in do_something")
    sleep(x*3)
    if x == 2:
        # raise a segmentation fault internally
        return x, ctypes.string_at(0)
    return x, x-1


nums = [1, 2, 3, 1.5]
executor = concurrent.futures.ProcessPoolExecutor()
result_futures = []
for num in nums:
    # Using submit with a list instead of map lets you get past the first exception
    # Example: https://stackoverflow.com/a/53346191/7619676
    future = executor.submit(do_something, num)
    result_futures.append(future)

# Wait for all results
concurrent.futures.wait(result_futures)

# After a segfault is hit for any child process (i.e. is "terminated abruptly"), the process pool becomes unusable
# and all running/pending child processes' results are set to broken
for future in result_futures:
    try:
        print(future.result())
    except concurrent.futures.process.BrokenProcessPool:
        print("broken")

结果:

(1, 0)
broken
broken
(1.5, 0.5)

想要的结果:

(1, 0)
broken
(3, 2)
(1.5, 0.5)

【问题讨论】:

    标签: python python-3.x multiprocessing python-multiprocessing concurrent.futures


    【解决方案1】:

    multiprocessing.Poolconcurrent.futures.ProcessPoolExecutor 都假设如果任何一个进程被杀死或段错误,如何处理工作者和主进程之间的交互的并发性,所以他们会做安全的事情并标记整个游泳池都坏了。为了解决这个问题,您需要直接使用 multiprocessing.Process 实例建立自己的池,并使用不同的假设。

    这听起来可能有点吓人,但 listmultiprocessing.Manager 会让你走得很远:

    import multiprocessing
    import ctypes
    import queue
    from time import sleep
    
    def do_something(job, result):
        while True:
            x=job.get()
            print(f"{x}; in do_something")
            sleep(x*3)
            if x == 2:
                # raise a segmentation fault internally
                return x, ctypes.string_at(0)
            result.put((x, x-1))
    
    nums = [1, 2, 3, 1.5]
    
    if __name__ == "__main__":
        # you ARE using the spawn context, right?
        ctx = multiprocessing.get_context("spawn")
        manager = ctx.Manager()
        job_queue = manager.Queue(maxsize=-1)
        result_queue = manager.Queue(maxsize=-1)
        pool = [
            ctx.Process(target=do_something, args=(job_queue, result_queue), daemon=True)
            for _ in range(multiprocessing.cpu_count())
        ]
        for proc in pool:
            proc.start()
        for num in nums:
            job_queue.put(num)
        try:
            while True:
                # Timeout is our only signal that no more results coming
                print(result_queue.get(timeout=10))
        except queue.Empty:
            print("Done!")
        print(pool)  # will see one dead Process 
        for proc in pool:
            proc.kill()  # avoid stderr spam
    

    这个“池”有点不灵活,您可能希望根据应用程序的特定需求对其进行自定义。但是你绝对可以跳过段错误的工人。

    当我进入这个兔子洞时,我有兴趣取消特定提交到工作池,我最终编写了一个完整的库来集成到 Trio 异步应用程序中:trio-parallel。希望您不需要走那么远!

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-01-19
      • 1970-01-01
      • 1970-01-01
      • 2014-08-12
      • 1970-01-01
      • 1970-01-01
      • 2016-08-12
      • 1970-01-01
      相关资源
      最近更新 更多