【问题标题】:Python multiprocessing - Capturing signals to restart child processes or shut down parent processPython多处理 - 捕获信号以重新启动子进程或关闭父进程
【发布时间】:2017-03-20 02:33:57
【问题描述】:

我正在使用多处理库来生成两个子进程。我想确保只要父进程还活着,如果子进程死亡(收到 SIGKILL 或 SIGTERM),它们就会自动重新启动。另一方面,如果父进程收到 SIGTERM/SIGINT,我希望它终止所有子进程然后退出。

这就是我解决问题的方式:

import sys
import time
from signal import signal, SIGINT, SIGTERM, SIGQUIT, SIGCHLD, SIG_IGN
from functools import partial
import multiprocessing
import setproctitle

class HelloWorld(multiprocessing.Process):
    def __init__(self):
        super(HelloWorld, self).__init__()

        # ignore, let parent handle it
        signal(SIGTERM, SIG_IGN)

    def run(self):

        setproctitle.setproctitle("helloProcess")

        while True:
            print "Hello World"
            time.sleep(1)

class Counter(multiprocessing.Process):
    def __init__(self):
        super(Counter, self).__init__()

        self.counter = 1

        # ignore, let parent handle it
        signal(SIGTERM, SIG_IGN)

    def run(self):

        setproctitle.setproctitle("counterProcess")

        while True:
            print self.counter
            time.sleep(1)
            self.counter += 1


def signal_handler(helloProcess, counterProcess, signum, frame):

    print multiprocessing.active_children()
    print "helloProcess: ", helloProcess
    print "counterProcess: ", counterProcess

    if signum == 17:

        print "helloProcess: ", helloProcess.is_alive()

        if not helloProcess.is_alive():
            print "Restarting helloProcess"

            helloProcess = HelloWorld()
            helloProcess.start()

        print "counterProcess: ", counterProcess.is_alive()

        if not counterProcess.is_alive():
            print "Restarting counterProcess"

            counterProcess = Counter()
            counterProcess.start()

    else:

        if helloProcess.is_alive():
            print "Stopping helloProcess"
            helloProcess.terminate()

        if counterProcess.is_alive():
            print "Stopping counterProcess"
            counterProcess.terminate()

        sys.exit(0)



if __name__ == '__main__':

    helloProcess = HelloWorld()
    helloProcess.start()

    counterProcess = Counter()
    counterProcess.start()

    for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
        signal(signame, partial(signal_handler, helloProcess, counterProcess))

    multiprocessing.active_children()

如果我向 counterProcess 发送 SIGKILL,它将正确重新启动。但是,向 helloProcess 发送 SIGKILL 也会重新启动 counterProcess 而不是 helloProcess?

如果我向父进程发送 SIGTERM,父进程将退出,但子进程成为孤立进程并继续。如何纠正这种行为?

【问题讨论】:

    标签: python linux signals python-multiprocessing


    【解决方案1】:

    代码有几个问题,所以我将依次讨论它们。

    如果我向 counterProcess 发送 SIGKILL,它将正确重新启动。但是,向 helloProcess 发送 SIGKILL 也会重新启动 counterProcess 而不是 helloProcess?

    这种特殊行为很可能是由于您的主进程中缺少阻塞调用,因为multiprocessing.active_children() 并没有真正充当一个。我无法真正解释程序行为方式的确切原因,但在__main__ 函数中添加阻塞调用,例如。

    while True:
        time.sleep(1)
    

    解决问题。

    另一个相当严重的问题是将对象传递给处理程序的方式:

    helloProcess = HelloWorld()
    ...
    partial(signal_handler, helloProcess, counterProcess)
    

    考虑到你在里面创建了新对象,这已经过时了:

    if not helloProcess.is_alive():
        print "Restarting helloProcess"
    
        helloProcess = HelloWorld()
        helloProcess.start()
    

    请注意,这两个对象对HelloWorld() 对象使用不同的别名。部分对象绑定到 __main__ 函数中的别名,而回调中的对象绑定到其本地范围别名。因此,通过将新对象分配给本地范围别名,您不会真正影响回调绑定到的对象(它仍然绑定到在__main__ 范围内创建的对象)。

    您可以通过在回调范围内以相同的方式将信号回调与新对象重新绑定来修复它:

    def signal_handler(...):
        ...
        for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
            signal(signame, partial(signal_handler, helloProcess, counterProcess))
        ...
    

    然而,这会导致另一个陷阱,因为现在每个子进程都会从父进程继承回调,并在每次收到信号时访问它。要修复它,您可以在创建子进程之前将信号处理程序临时设置为默认值:

    for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
        signal(signame, SIG_DFL)
    

    最后,您可能希望在终止子进程之前压制来自子进程的任何信号,否则它们会再次触发回调:

    signal(SIGCHLD, SIG_IGN)
    

    请注意,您可能希望重新设计应用程序的架构并利用multiprocessing 提供的一些功能。

    最终代码:

    import sys
    import time
    from signal import signal, SIGINT, SIGTERM, SIGQUIT, SIGCHLD, SIG_IGN, SIG_DFL
    from functools import partial
    import multiprocessing
    #import setproctitle
    
    class HelloWorld(multiprocessing.Process):
        def __init__(self):
            super(HelloWorld, self).__init__()
    
            # ignore, let parent handle it
            #signal(SIGTERM, SIG_IGN)
    
        def run(self):
    
            #setproctitle.setproctitle("helloProcess")
    
            while True:
                print "Hello World"
                time.sleep(1)
    
    class Counter(multiprocessing.Process):
        def __init__(self):
            super(Counter, self).__init__()
    
            self.counter = 1
    
            # ignore, let parent handle it
            #signal(SIGTERM, SIG_IGN)
    
        def run(self):
    
            #setproctitle.setproctitle("counterProcess")
    
            while True:
                print self.counter
                time.sleep(1)
                self.counter += 1
    
    
    def signal_handler(helloProcess, counterProcess, signum, frame):
    
        print multiprocessing.active_children()
        print "helloProcess: ", helloProcess
        print "counterProcess: ", counterProcess
    
        print "current_process: ", multiprocessing.current_process()
    
        if signum == 17:
    
            # Since each new child inherits current signal handler,
            # temporarily set it to default before spawning new child.
            for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
                signal(signame, SIG_DFL)
    
            print "helloProcess: ", helloProcess.is_alive()
    
            if not helloProcess.is_alive():
                print "Restarting helloProcess"
    
                helloProcess = HelloWorld()
                helloProcess.start()
    
            print "counterProcess: ", counterProcess.is_alive()
    
            if not counterProcess.is_alive():
                print "Restarting counterProcess"
    
                counterProcess = Counter()
                counterProcess.start()
    
            # After new children are spawned, revert to old signal handling policy.
            for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
                signal(signame, partial(signal_handler, helloProcess, counterProcess))
    
    
        else:
    
            # Ignore any signal that child communicates before quit   
            signal(SIGCHLD, SIG_IGN) 
    
            if helloProcess.is_alive():
                print "Stopping helloProcess"
                helloProcess.terminate()
    
            if counterProcess.is_alive():
                print "Stopping counterProcess"
                counterProcess.terminate()
    
            sys.exit(0)
    
    
    
    if __name__ == '__main__':
    
        helloProcess = HelloWorld()
        helloProcess.start()
    
        counterProcess = Counter()
        counterProcess.start()
    
        for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
            signal(signame, partial(signal_handler, helloProcess, counterProcess))
    
        while True:
            print multiprocessing.active_children()
            time.sleep(1)
    

    【讨论】:

    • signal.SIGCHLD handler 和 multiprocessing.Process 不能很好地协同工作。在signal.SIGCHLD 处理程序中,Process.is_alive 即使在子进程终止后也会返回True
    【解决方案2】:

    要从signal.SIGCHLD 处理程序重新创建死去的孩子,母亲必须调用os.wait 函数之一,因为Process.is_alive 在这里不起作用。
    尽管有可能,但这很复杂,因为signal.SIGCHLD 是在其中一个孩子的状态发生变化时交付给母亲的。孩子收到signal.SIGSTOPsignal.SIGCONT 或任何其他终止信号。
    所以signal.SIGCHLD 处理程序必须区分孩子的这些状态。仅仅在signal.SIGCHLD 交付时重新创建孩子可能会创建不必要的孩子。

    以下代码使用os.waitpidos.WNOHANG 使其非阻塞,并使用os.WUNTRACEDos.WCONTINUED 来了解signal.SIGCHLD 是来自signal.SIGSTOP 还是signal.SIGCONT
    os.waitpid不起作用,即如果Process 实例中的任何一个是printed,则返回(0, 0),即str(Process()),然后再调用os.waitpid

    import sys
    import time
    from signal import signal, pause, SIGINT, SIGTERM, SIGQUIT, SIGCHLD, SIG_DFL
    import multiprocessing
    import os
    
    class HelloWorld(multiprocessing.Process):
        def run(self):
            # reset SIGTERM to default for Process.terminate to work
            signal(SIGTERM, SIG_DFL)
            while True:
                print "Hello World"
                time.sleep(1)
    
    class Counter(multiprocessing.Process):
        def __init__(self):
            super(Counter, self).__init__()
            self.counter = 1
    
        def run(self):
            # reset SIGTERM to default for Process.terminate to work
            signal(SIGTERM, SIG_DFL)
            while True:
                print self.counter
                time.sleep(1)
                self.counter += 1
    
    
    def signal_handler(signum, _):
        global helloProcess, counterProcess
    
        if signum == SIGCHLD:
            pid, status = os.waitpid(-1, os.WNOHANG|os.WUNTRACED|os.WCONTINUED)
            if os.WIFCONTINUED(status) or os.WIFSTOPPED(status):
                return
            if os.WIFSIGNALED(status) or os.WIFEXITED(status):
                if helloProcess.pid == pid:
                    print("Restarting helloProcess")
                    helloProcess = HelloWorld()
                    helloProcess.start()
    
                elif counterProcess.pid == pid:
                    print("Restarting counterProcess")
                    counterProcess = Counter()
                    counterProcess.start()
    
        else:
            # mother shouldn't be notified when it terminates children
            signal(SIGCHLD, SIG_DFL)
            if helloProcess.is_alive():
                print("Stopping helloProcess")
                helloProcess.terminate()
    
            if counterProcess.is_alive():
                print("Stopping counterProcess")
                counterProcess.terminate()
    
            sys.exit(0)
    
    if __name__ == '__main__':
    
        helloProcess = HelloWorld()
        helloProcess.start()
    
        counterProcess = Counter()
        counterProcess.start()
    
        for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
            signal(signame, signal_handler)
    
        while True:
            pause()
    

    以下代码在不使用signal.SIGCHLD 的情况下重新创建死去的孩子。所以它比前者简单
    创建了两个子进程后,母进程为 SIGINT、SIGTERM、SIGQUIT 设置了一个名为 term_child 的信号处理程序。 term_child 在调用时终止并加入每个孩子。

    母进程不断检查子进程是否还活着,并在必要时在while 循环中重新创建它们。

    因为每个孩子都从母亲那里继承信号处理程序,所以SIGINT 处理程序应该重置为默认值,Process.terminate 才能工作

    import sys
    import time
    from signal import signal, SIGINT, SIGTERM, SIGQUIT
    import multiprocessing
    
    class HelloWorld(multiprocessing.Process):    
        def run(self):
            signal(SIGTERM, SIG_DFL)
            while True:
                print "Hello World"
                time.sleep(1)
    
    class Counter(multiprocessing.Process):
        def __init__(self):
            super(Counter, self).__init__()
            self.counter = 1
    
        def run(self):
            signal(SIGTERM, SIG_DFL)
            while True:
                print self.counter
                time.sleep(1)
                self.counter += 1
    
    def term_child(_, __):
        for child in children:
            child.terminate()
            child.join()
        sys.exit(0)
    
    if __name__ == '__main__':
    
        children = [HelloWorld(), Counter()]
        for child in children:
            child.start()
    
        for signame in (SIGINT, SIGTERM, SIGQUIT):
            signal(signame, term_child)
    
        while True:
            for i, child in enumerate(children):
                if not child.is_alive():
                    children[i] = type(child)()
                    children[i].start()
            time.sleep(1)
    

    【讨论】:

      猜你喜欢
      • 2017-08-22
      • 2016-08-01
      • 2012-09-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多