【问题标题】:How to terminate multiprocessing Pool processes?如何终止多处理池进程?
【发布时间】:2013-04-30 08:14:26
【问题描述】:

我正在开发渲染农场,我需要我的客户能够启动渲染器的多个实例,而不会阻塞,以便客户端可以接收新命令。我的工作正常,但是在终止创建的进程时遇到问题。

在全局级别,我定义了我的池(以便我可以从任何函数访问它):

p = Pool(2)

然后我用 apply_async 调用我的渲染器:

for i in range(totalInstances):
    p.apply_async(render, (allRenderArgs[i],args[2]), callback=renderFinished)
p.close()

该函数完成,在后台启动进程,并等待新命令。我做了一个简单的命令,它将杀死客户端并停止渲染:

def close():
    '''
        close this client instance
    '''
    tn.write ("say "+USER+" is leaving the farm\r\n")
    try:
        p.terminate()
    except Exception,e:
        print str(e)
        sys.exit()

它似乎没有给出错误(它会打印错误),python 终止但后台进程仍在运行。谁能推荐一种更好的方法来控制这些已启动的程序?

【问题讨论】:

  • 尝试使用from multiprocessing import util; util.get_logger().setLevel(util.DEBUG) 启用调试日志并粘贴输出。
  • 我以前见过这样的行为,但现在无法重现......我想知道在调用 p.terminate() 之后调用 p.join() 是否会有所帮助?我还想知道您是否甚至需要调用终止,如果只是执行 sys.exit() 将正确地垃圾收集池及其所有进程。
  • 当我尝试启用日志记录时,我在控制台中得到了这个:“找不到记录器“多处理”的处理程序。不幸的是,p.terminate() 之后的 p.join() 没有' t 有所作为,并且 sys.exit() 关闭 python 但让进程在后台运行。
  • 试试multiprocessing.log_to_stderr().setLevel(logging.DEBUG)render() 是否启动其他进程,例如使用 subprocess 模块?

标签: python multiprocessing python-multiprocessing pool


【解决方案1】:

我找到了解决方案:在单独的线程中停止池,如下所示:

def close_pool():
    global pool
    pool.close()
    pool.terminate()
    pool.join()

def term(*args,**kwargs):
    sys.stderr.write('\nStopping...')
    # httpd.shutdown()
    stophttp = threading.Thread(target=httpd.shutdown)
    stophttp.start()
    stoppool=threading.Thread(target=close_pool)
    stoppool.daemon=True
    stoppool.start()


signal.signal(signal.SIGTERM, term)
signal.signal(signal.SIGINT, term)
signal.signal(signal.SIGQUIT, term)

工作正常,我一直在测试。

signal.SIGINT

从键盘中断 (CTRL + C)。默认操作是引发 KeyboardInterrupt。

signal.SIGKILL

终止信号。它不能被捕获、阻止或忽略。

signal.SIGTERM

终止信号。

signal.SIGQUIT

退出核心转储。

【讨论】:

  • SIGTERMSIGINTSIGQUIT 是什么?
  • 它用于与ctrl+c、task kill、sys.exit() 等打断。
【解决方案2】:
# -*- coding:utf-8 -*-
import multiprocessing
import time
import sys
import threading
from functools import partial


#> work func
def f(a,b,c,d,e):
    print('start')
    time.sleep(4)
    print(a,b,c,d,e)

###########> subProcess func
#1. start a thead for work func
#2. waiting thead with a timeout
#3. exit the subProcess
###########
def mulPro(f, *args, **kwargs):
    timeout = kwargs.get('timeout',None)

    #1. 
    t = threading.Thread(target=f, args=args)
    t.setDaemon(True)
    t.start()
    #2. 
    t.join(timeout)
    #3. 
    sys.exit()

if __name__ == "__main__":

    p = multiprocessing.Pool(5)
    for i in range(5):
        #1. process the work func with "subProcess func"
        new_f = partial(mulPro, f, timeout=8)
        #2. fire on
        p.apply_async(new_f, args=(1,2,3,4,5),)

        # p.apply_async(f, args=(1,2,3,4,5), timeout=2)
    for i in range(10):
        time.sleep(1)
        print(i+1,"s")

    p.close()
    # p.join()

【讨论】:

    【解决方案3】:

    找到了我自己问题的答案。主要问题是我调用的是第三方应用程序而不是函数。当我调用子进程[使用 call() 或 Popen()] 时,它会创建一个新的 python 实例,其唯一目的是调用新的应用程序。但是当 python 退出时,它会杀死这个新的 python 实例并让应用程序继续运行。

    解决方案是通过找到所创建的 python 进程的 pid,获取该 pid 的子进程并杀死它们来执行此操作。此代码特定于 osx;有更简单的代码(不依赖于 grep)可用于 linux。

    for process in pool:
        processId = process.pid
        print "attempting to terminate "+str(processId)
        command = " ps -o pid,ppid -ax | grep "+str(processId)+" | cut -f 1 -d \" \" | tail -1"
        ps_command = Popen(command, shell=True, stdout=PIPE)
        ps_output = ps_command.stdout.read()
        retcode = ps_command.wait()
        assert retcode == 0, "ps command returned %d" % retcode
        print "child process pid: "+ str(ps_output)
        os.kill(int(ps_output), signal.SIGTERM)
        os.kill(int(processId), signal.SIGTERM)
    

    【讨论】:

      【解决方案4】:

      如果您仍然遇到此问题,您可以尝试使用daemonic processes 模拟Pool(假设您从非守护进程启动池/进程)。我怀疑这是最好的解决方案,因为您的 Pool 进程似乎应该退出,但这是我能想到的。我不知道你的回调是做什么的,所以我不确定在下面的示例中将它放在哪里。

      我还建议尝试在__main__ 中创建您的Pool,因为我的经验(和文档)在全局生成进程时会发生怪异。如果您使用的是 Windows,则尤其如此:http://docs.python.org/2/library/multiprocessing.html#windows

      from multiprocessing import Process, JoinableQueue
      
      # the function for each process in our pool
      def pool_func(q):
          while True:
              allRenderArg, otherArg = q.get() # blocks until the queue has an item
              try:
                  render(allRenderArg, otherArg)
              finally: q.task_done()
      
      # best practice to go through main for multiprocessing
      if __name__=='__main__':
          # create the pool
          pool_size = 2
          pool = []
          q = JoinableQueue()
          for x in range(pool_size):
              pool.append(Process(target=pool_func, args=(q,)))
      
          # start the pool, making it "daemonic" (the pool should exit when this proc exits)
          for p in pool:
              p.daemon = True
              p.start()
      
          # submit jobs to the queue
          for i in range(totalInstances):
              q.put((allRenderArgs[i], args[2]))
      
          # wait for all tasks to complete, then exit
          q.join()
      

      【讨论】:

      • 有趣!关于在主而不是全局定义的好技巧。我以这种方式重建并没有解决我的问题(见下文),但我更喜欢这种结构。谢谢!
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-09-05
      • 2015-11-10
      • 1970-01-01
      • 2022-11-30
      • 2018-11-05
      相关资源
      最近更新 更多