【问题标题】:multiprocessing threadpool not terminating as expected多处理线程池未按预期终止
【发布时间】:2016-04-16 01:55:34
【问题描述】:

我在 Windows 7 上运行 Python 3.4。

我正在尝试更多地了解多处理,并尝试编写一个函数,该函数在另一个函数调用上执行干净的小超时。但是我遇到了一个我无法弄清楚的问题。

根据关于多处理的 Python 文档:

Terminate() "立即停止工作进程而不完成未完成的工作。"

但是,当我对此进行测试时,pool.terminate() 似乎在等待工作进程完成而不是杀死它们!

所以当我运行这段代码时:

import multiprocessing.pool
from time import sleep

def timeout(seconds, function, *args, **kwargs):

    pool = multiprocessing.pool.ThreadPool(processes = 1)
    result = pool.apply_async(function, args, kwargs)

    try: result.get(seconds)
    except multiprocessing.context.TimeoutError:
        print("Process timed out")
        pool.terminate()
        pool.join()
        print("Pool terminated")
    finally:
        pool.close()

def worker():
    for n in range(5):
        sleep(1)
        print(n+1)
    print("Process succeeded")

timeout(2.5, worker)

我希望结果是这样的:

1
2
Process timed out
Pool terminated

但是我得到了这个:

1
2
Process timed out
3
4
5
Process succeeded
Pool terminated

我知道result.get 引发了TimeoutError,因为消息“进程超时”已成功打印。而且我知道pool.terminate() 被调用是出于同样的原因,它似乎没有做任何事情!

我觉得这里有些东西我只是不理解。有人可以帮帮我吗?

【问题讨论】:

  • 我猜这是一个 Windows 问题 - 对于 Linux,它几乎肯定会发送一个信号,这在 Windows 7 AFAIK 上不可用。

标签: python multiprocessing


【解决方案1】:

我不知道为什么,但问题似乎是由 pool.join() 调用引起的,无论如何你并不需要它,因为工作进程应该被前面的 terminate() 调用终止。

import multiprocessing.pool
from time import sleep

def timeout(seconds, function, *args, **kwargs):
    pool = multiprocessing.pool.ThreadPool(processes=1)
    result = pool.apply_async(function, args, kwargs)
    try:
        result.get(timeout=seconds)
    except multiprocessing.TimeoutError:
        print("Process timed out")
    pool.terminate()
#    pool.join()  # Don't need this, all worker threads have been stopped.
    print("Pool terminated")

def worker():
    for n in range(5):
        sleep(1)
        print(n+1)
    print("Process succeeded")

timeout(2.5, worker)

输出:

1
2
Process timed out
Pool terminated

不管怎样,请注意,从 3.3 版开始,Pool 对象支持上下文管理协议,这意味着Pool.terminate() 在使用时会自动调用——所以函数可以更简洁地写成这样:

def timeout(seconds, function, *args, **kwargs):
    with multiprocessing.pool.ThreadPool(processes=1) as pool:
        result = pool.apply_async(function, args, kwargs)
        try:
            result.get(timeout=seconds)
        except multiprocessing.TimeoutError:
            print("Process timed out")
    print("Pool terminated")

【讨论】:

  • 谢谢!这是一个非常有用的答案。我正在遵循我在其他地方找到的一些建议(现在找不到),您应该始终遵循带有 join() 的 terminate(),但我想那是废话。我确实注意到,虽然这在正常运行 python 脚本时有效,但在 IDLE 中运行时它不起作用。你有什么想法可以解决这个问题吗?还是要求太多?
  • 不客气。抱歉,不知道为什么它在 IDLE 中也不起作用(特别是因为我自己不使用它)。到底是怎么不工作的?
  • 作为一个实验,尝试通过在 if __name__ == '__main__': 前面加上 if __name__ == '__main__':(或任何 IDLE 等效项)来使 timeout(2.5, worker) 成为条件。
猜你喜欢
  • 2013-04-30
  • 1970-01-01
  • 1970-01-01
  • 2023-03-18
  • 2020-09-13
  • 1970-01-01
  • 2023-02-20
  • 2019-11-17
  • 2020-11-19
相关资源
最近更新 更多