【问题标题】:Python multiprocessing: terminate processes by arguments after exceeding timeoutPython多处理:超过超时后通过参数终止进程
【发布时间】:2021-05-22 00:35:08
【问题描述】:

我想针对kn 的不同值计算长时间运行函数的运行时间。 例如:k = [2,3,...,100]n = [50,100,150,200,...,1000]

如果特定 (k, n) 元组的运行时间超过特定时间量(例如,60 seconds)我想用更大的n 停止每个进程的执行(并将运行时间设置为inf)。 例如,如果n = 500 超时,我想取消所有带有n >= 500 的任务的执行。

我尝试使用 Python 的 multiprocessing.Poolconcurrent.futures.ProcessPoolExecutor,但找不到取消正在运行的任务的方法。据我发现,在它运行时无法取消它。

我认为也许我应该以不同的方式解决这个问题。

请指教。

import multiprocessing
import random
import time
from datetime import datetime
from itertools import product
from multiprocessing import Pool

n_list = [n * 50 for n in range(1, 21)]
k_list = [k for k in range(2, 31)]
k_n_list = list(product(k_list, n_list))

def long_running_function(k, n):
    start_time = datetime.now()
    time.sleep(random.randint(2,120))
    end_time = datetime.now()
    running_time = end_time - start_time

    return k, n, running_time.total_seconds()


running_times = []

with Pool(processes=multiprocessing.cpu_count()) as pool:
    async_results = []
    for k, n in k_n_list:
        async_results.append((k, n, pool.apply_async(func=long_running_function, args=(k, n))))

    for k, n, result in async_results:
        try:
            process_result = result.get(60)  # timeout after 60 seconds
            running_times.append(process_result)

        except multiprocessing.TimeoutError:
            print(f"Timeout for k = {k}, n = {n}")
            running_times.append((k, n, float('inf')))

            # HERE I WOULD LIKE TO CANCEL EVERY TASK WITH N >= n

【问题讨论】:

    标签: python multithreading multiprocessing threadpool threadpoolexecutor


    【解决方案1】:

    首先你应该知道的是下面的语句......

    process_result = result.get(60)  # timeout after 60 seconds
    

    ...如果与result 关联的任务尚未完成,将引发multiprocessing.TimeoutError但它不会终止任务;任务继续运行。但是,当调用 pool.terminate() 时,无论是在您退出 with Pool ... as pool: 块时隐式调用还是显式调用时,池中的所有进程(当然还有它们当前正在运行的任务)都将被终止。但是千万别想用concurrent.futures创建的进程池;没有任何方法可以在所有任务完成之前终止进程。

    其次,您在os.cpu_count() 大小的进程池中运行k * n 任务,其中任务数可能远大于您拥有的池中的进程数。因此,有可能当你发现自己的一项任务在 60 秒内没有完成时,还有很多任务甚至还没有开始运行。这总是有问题的,因为您将为所有具有特定 n 值的任务留出 60 秒的时间来完成,但许多任务在被终止之前甚至没有机会开始。

    第三,在你执行的循环中......

    process_result = result.get(60)
    

    您正在测试的 AsynchResult 实例可能会在 3 秒后返回结果(而不是超时)。 但是自您提交任务以来已经过去了 3 秒。在下一次迭代中,您现在只想等待 57 秒等待下一个结果!

    一种可能的解决方案是使用一个multiprocessing.Value 实例,该实例存储在所有进程的共享内存中,因此对初始化为sys.maxsize 的所有任务都是可见的。您的工作函数必须定期检查此Value 的值,如果小于或等于它们正在处理的n 的值,这是工作函数立即正常返回的信号。因此,代码变成了下面的样子(注意我为了演示的目的已经改变了一些参数):

    import multiprocessing
    import random
    import time
    from datetime import datetime
    from itertools import product
    from multiprocessing import Pool, Value
    import sys
    import ctypes
    
    
    def init_pool(v):
        global stop_n
        stop_n = v
    
    def long_running_function(k, n):
        print('n =', n)
        start_time = datetime.now()
        #sleep_time = random.randint(2, 10)
        sleep_time = n / 100 + .3
        t_stop = time.time() + sleep_time
        while time.time() < t_stop:
            if n >= stop_n.value:
                print('quitting because my n is', n)
                break
            time.sleep(.1)
        end_time = datetime.now()
        running_time = end_time - start_time
    
        return k, n, running_time.total_seconds()
    
    
    # required for Windows:
    if __name__ == '__main__':
        n_list = [n * 50 for n in range(1, 20)]
        k_list = [k for k in range(2, 3)]
        k_n_list = list(product(k_list, n_list))
    
        running_times = []
    
        stop_n = Value(ctypes.c_ulonglong,  sys.maxsize)
    
        # best to leave one processor free for main process
        with Pool(processes=multiprocessing.cpu_count() - 1, initializer=init_pool, initargs=(stop_n,)) as pool:
            async_results = []
            for k, n in k_n_list:
                async_results.append((k, n, pool.apply_async(func=long_running_function, args=(k, n))))
    
            TIMEOUT = 4 # timeout after 4 seconds
            start_time = time.time()
            for k, n, result in async_results:
                try:
                    time_to_wait = TIMEOUT - (time.time() - start_time)
                    if time_to_wait < 0:
                        time_to_wait = 0
                    process_result = result.get(time_to_wait)
                except multiprocessing.TimeoutError:
                    # signal to tasks whose n argument is >= than this value of n:
                    print('setting stop value to', n)
                    stop_n.value = n
                    break
    
            # now process actual results:
            for k, n, result in async_results:
                process_result = result.get()
                running_times.append(process_result)
            print(running_times)
    

    打印:

    n = 50
    n = 100
    n = 150
    n = 200
    n = 250
    n = 300
    n = 350
    n = 400
    n = 450
    n = 500
    n = 550
    n = 600
    n = 650
    n = 700
    setting stop value to 400
    quitting because my n is 400
    n = 750
    quitting because my n is 750
    n = 800
    quitting because my n is 800
    quitting because my n is 500
    quitting because my n is 450
    n = 850
    n = 900
    n = 950
    quitting because my n is 850
    quitting because my n is 900
    quitting because my n is 950
    quitting because my n is 550
    quitting because my n is 600
    quitting because my n is 650
    quitting because my n is 700
    [(2, 50, 0.803502), (2, 100, 1.306462), (2, 150, 1.807341), (2, 200, 2.308982), (2, 250, 2.812402), (2, 300, 3.315068), (2, 350, 3.81634), (2, 400, 3.114924), (2, 450, 2.627066), (2, 500, 2.124075), (2, 550, 1.607504), (2, 600, 1.104059), (2, 650, 0.604383), (2, 700, 0.100104), (2, 750, 0.001005), (2, 800, 0.000999), (2, 850, 0.002), (2, 900, 0.001999), (2, 950, 0.001999)]
    

    您会观察到,在我的具有 8 个核心的桌面上,其中 7 个已分配给池,在共享的Value 设置为400 时,有几个任务正在等待启动,因此当它们执行start 他们立即终止(你可以看到他们的运行时间非常小)。正如我所说,您尝试这种方式是有问题的。 最好在将Value 设置为n 之后,为每个适用的任务而不是立即返回,给自己一定的秒数来完成。

    更新

    如果您希望已经开始主进程的任务无论如何都完成(因为他们无法检查stop_n),请将long_range_function更改为:

    def long_running_function(k, n):
        start_time = datetime.now()
        print('n =', n)
        if n < stop_n.value:
            #time.sleep(random.randint(2, 10))
            time.sleep(n / 100 + .3)
        else:
            print('quitting because my n is', n)
        end_time = datetime.now()
        running_time = end_time - start_time
    
        return k, n, running_time.total_seconds()
    

    现在打印:

    n = 50
    n = 100
    n = 150
    n = 200
    n = 250
    n = 300
    n = 350
    n = 400
    n = 450
    n = 500
    n = 550
    n = 600
    n = 650
    n = 700
    setting stop value to 400
    n = 750
    quitting because my n is 750
    n = 800
    quitting because my n is 800
    n = 850
    quitting because my n is 850
    n = 900
    quitting because my n is 900
    n = 950
    quitting because my n is 950
    [(2, 50, 0.801908), (2, 100, 1.300968), (2, 150, 1.800735), (2, 200, 2.301075), (2, 250, 2.800968), (2, 300, 3.301077), (2, 350, 3.800717), (2, 400, 4.301718), (2, 450, 4.801664), (2, 500, 5.301043), (2, 550, 5.800506), (2, 600, 6.300665), (2, 650, 6.800603), (2, 700, 7.301471), (2, 750, 0.0), (2, 800, 0.0), (2, 850, 0.0), (2, 900, 0.0), (2, 950, 0.001015)]
    

    【讨论】:

    • 谢谢@Booboo!是否可以在不检查工作线程中的n 的情况下完成您的建议?实际上我的工人运行一些库函数。我没有任何“while 循环”(或类似的东西)来定期检查它。也许我应该让他们完成并仅在开始时检查..
    • 现在你说已经运行的任务应该被允许完成,因为它们无法检查(可能是值stop_n.value),因为没有while loop。我明白了。但是,我也知道他们可以在任务启动时进行检查。我不明白你所说的“不检查n”是什么意思。你需要比较nstop_n.value开头的long_running_function
    • 查看更新后的long_running_function 新输出。
    猜你喜欢
    • 2019-04-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-05-15
    • 1970-01-01
    • 2020-05-05
    相关资源
    最近更新 更多