【发布时间】:2013-12-02 01:24:08
【问题描述】:
我需要执行一个包含许多并行数据库连接和查询的池。我想使用 multiprocessing.Pool 或 concurrent.futures ProcessPoolExecutor。 Python 2.7.5
在某些情况下,查询请求耗时过长或永远无法完成(挂起/僵尸进程)。我想从已超时的 multiprocessing.Pool 或 concurrent.futures ProcessPoolExecutor 中终止 特定 进程。
这是一个如何杀死/重新生成整个进程池的示例,但理想情况下,我会尽量减少 CPU 抖动,因为我只想杀死超时秒后未返回数据的特定长时间运行的进程。
由于某种原因,在所有结果返回并完成后,下面的代码似乎无法终止/加入进程池。它可能与在超时发生时杀死工作进程有关,但是当它们被杀死并且结果符合预期时,池会创建新的工作进程。
from multiprocessing import Pool
import time
import numpy as np
from threading import Timer
import thread, time, sys
def f(x):
time.sleep(x)
return x
if __name__ == '__main__':
pool = Pool(processes=4, maxtasksperchild=4)
results = [(x, pool.apply_async(f, (x,))) for x in np.random.randint(10, size=10).tolist()]
while results:
try:
x, result = results.pop(0)
start = time.time()
print result.get(timeout=5), '%d done in %f Seconds!' % (x, time.time()-start)
except Exception as e:
print str(e)
print '%d Timeout Exception! in %f' % (x, time.time()-start)
for p in pool._pool:
if p.exitcode is None:
p.terminate()
pool.terminate()
pool.join()
【问题讨论】:
标签: python process timeout multiprocessing pool