【问题标题】:Signal handling in multi-threaded Python多线程 Python 中的信号处理
【发布时间】:2014-09-05 00:12:18
【问题描述】:

这应该很简单,我很惊讶我无法在 stackoverflow 上找到已经回答的问题。

我有一个类似守护进程的程序,它需要响应 SIGTERM 和 SIGINT 信号才能与 upstart 一起正常工作。我读到最好的方法是在与主线程不同的线程中运行程序的主循环,并让主线程处理信号。然后,当接收到信号时,信号处理程序应该通过设置一个在主循环中例行检查的哨兵标志来告诉主循环退出。

我已经尝试过这样做,但它没有按我预期的方式工作。请看下面的代码:

from threading import Thread
import signal
import time
import sys

stop_requested = False    

def sig_handler(signum, frame):
    sys.stdout.write("handling signal: %s\n" % signum)
    sys.stdout.flush()

    global stop_requested
    stop_requested = True    

def run():
    sys.stdout.write("run started\n")
    sys.stdout.flush()
    while not stop_requested:
        time.sleep(2)

    sys.stdout.write("run exited\n")
    sys.stdout.flush()

signal.signal(signal.SIGTERM, sig_handler)
signal.signal(signal.SIGINT, sig_handler)

t = Thread(target=run)
t.start()
t.join()
sys.stdout.write("join completed\n")
sys.stdout.flush()

我通过以下两种方式对此进行了测试:

1)

$ python main.py > output.txt&
[2] 3204
$ kill -15 3204

2)

$ python main.py
ctrl+c

在这两种情况下,我都希望将其写入输出:

run started
handling signal: 15
run exited
join completed

在第一种情况下,程序退出,但我看到的只是:

run started

在第二种情况下,当按下 ctrl+c 并且程序没有退出时,SIGTERM 信号似乎被忽略了。

我在这里错过了什么?

【问题讨论】:

标签: python linux multithreading


【解决方案1】:

问题在于,正如Execution of Python signal handlers 中所述:

Python 信号处理程序不会在低级 (C) 信号处理程序内执行。相反,低级信号处理程序设置一个标志,告诉虚拟机稍后执行相应的 Python 信号处理程序(例如在下一个字节码指令处)

纯粹用 C 语言实现的长时间运行的计算(例如对大量文本进行正则表达式匹配)可以在任意时间内不间断地运行,而不管接收到任何信号。计算完成时将调用 Python 信号处理程序。

您的主线程在 threading.Thread.join 上被阻塞,这最终意味着它在 C 中被 pthread_join 调用阻塞。当然,这不是“长时间运行的计算”,而是系统调用上的一个块……但是,在调用完成之前,您的信号处理程序无法运行。

而且,虽然在某些平台上pthread_join 会在信号上出现EINTR 失败,但在其他平台上则不会。在linux上,我相信这取决于你是选择BSD风格还是默认siginterrupt行为,但默认是no。


那么,你能做些什么呢?

嗯,我很确定changes to signal handling in Python 3.3 实际上改变了 Linux 上的默认行为,所以如果你升级了,你不需要做任何事情;只需在 3.3+ 下运行,您的代码就会按预期工作。至少它对我来说适用于 OS X 上的 CPython 3.4 和 Linux 上的 3.3。 (如果我错了,我不确定这是否是 CPython 中的错误,所以你可能想在 python-list 上提出它而不是打开一个问题......)

另一方面,在 3.3 之前,signal 模块绝对不会公开您自己解决此问题所需的工具。因此,如果您无法升级到 3.3,解决方案是等待可中断的东西,例如 ConditionEvent。子线程在退出之前通知事件,主线程在加入子线程之前等待事件。这绝对是hacky。而且我找不到任何可以保证它会有所作为的东西;它恰好在 OS X 上的 CPython 2.7 和 3.2 以及 Linux 上的 2.6 和 2.7 的各种版本中为我工作......

【讨论】:

  • “这绝对是 hacky”——我一般不会这么说。与简单地使用join 相比,在更高的抽象级别上同步线程是明智的。如果您的目标是等待线程退出(如这个特定示例),那么join 是正确的工具;如果您想等待工作负载完成,Condition 等更有意义。毕竟,工作负载可以(例如)在一个不会立即退出的池线程中执行。
【解决方案2】:

abarnert 的回答很到位。不过,我仍在使用 Python 2.7。为了自己解决这个问题,我编写了一个 InterruptableThread 类。

现在它不允许向线程目标传递额外的参数。 Join 也不接受超时参数。这只是因为我不需要这样做。如果需要,您可以添加它。如果您自己使用它,您可能希望删除输出语句。它们只是作为评论和测试的一种方式。

import threading
import signal
import sys

class InvalidOperationException(Exception):
    pass    

# noinspection PyClassHasNoInit
class GlobalInterruptableThreadHandler:
    threads = []
    initialized = False

    @staticmethod
    def initialize():
        signal.signal(signal.SIGTERM, GlobalInterruptableThreadHandler.sig_handler)
        signal.signal(signal.SIGINT, GlobalInterruptableThreadHandler.sig_handler)
        GlobalInterruptableThreadHandler.initialized = True

    @staticmethod
    def add_thread(thread):
        if threading.current_thread().name != 'MainThread':
            raise InvalidOperationException("InterruptableThread objects may only be started from the Main thread.")

        if not GlobalInterruptableThreadHandler.initialized:
            GlobalInterruptableThreadHandler.initialize()

        GlobalInterruptableThreadHandler.threads.append(thread)

    @staticmethod
    def sig_handler(signum, frame):
        sys.stdout.write("handling signal: %s\n" % signum)
        sys.stdout.flush()

        for thread in GlobalInterruptableThreadHandler.threads:
            thread.stop()

        GlobalInterruptableThreadHandler.threads = []    

class InterruptableThread:
    def __init__(self, target=None):
        self.stop_requested = threading.Event()
        self.t = threading.Thread(target=target, args=[self]) if target else threading.Thread(target=self.run)

    def run(self):
        pass

    def start(self):
        GlobalInterruptableThreadHandler.add_thread(self)
        self.t.start()

    def stop(self):
        self.stop_requested.set()

    def is_stop_requested(self):
        return self.stop_requested.is_set()

    def join(self):
        try:
            while self.t.is_alive():
                self.t.join(timeout=1)
        except (KeyboardInterrupt, SystemExit):
            self.stop_requested.set()
            self.t.join()

        sys.stdout.write("join completed\n")
        sys.stdout.flush()

该类有两种不同的使用方式。你可以继承InterruptableThread:

import time
import sys
from interruptable_thread import InterruptableThread

class Foo(InterruptableThread):
    def __init__(self):
        InterruptableThread.__init__(self)

    def run(self):
        sys.stdout.write("run started\n")
        sys.stdout.flush()
        while not self.is_stop_requested():
            time.sleep(2)

        sys.stdout.write("run exited\n")
        sys.stdout.flush()

sys.stdout.write("all exited\n")
sys.stdout.flush()

foo = Foo()
foo2 = Foo()
foo.start()
foo2.start()
foo.join()
foo2.join()

或者您可以像使用 threading.thread 一样使用它。不过,run 方法必须将 InterruptableThread 对象作为参数。

import time
import sys
from interruptable_thread import InterruptableThread

def run(t):
    sys.stdout.write("run started\n")
    sys.stdout.flush()
    while not t.is_stop_requested():
        time.sleep(2)

    sys.stdout.write("run exited\n")
    sys.stdout.flush()

t1 = InterruptableThread(run)
t2 = InterruptableThread(run)
t1.start()
t2.start()
t1.join()
t2.join()

sys.stdout.write("all exited\n")
sys.stdout.flush()

随心所欲。

【讨论】:

  • 欣赏这篇文章,但 Python 2.7 即将走向灭亡,因此您可能需要重新考虑 3.7 的解决方案
【解决方案3】:

我在这里遇到了同样的问题signal not handled when multiple threads join。看了abarnert的回答后,改用Python 3解决了问题。但我确实喜欢将我的所有程序都更改为 python 3。所以,我通过避免在发送信号之前调用线程 join() 来解决我的程序。下面是我的代码。

不是很好,但是在python 2.7中解决了我的程序。我的问题被标记为重复,所以我把我的解决方案放在这里。

import threading, signal, time, os


RUNNING = True
threads = []

def monitoring(tid, itemId=None, threshold=None):
    global RUNNING
    while(RUNNING):
        print "PID=", os.getpid(), ";id=", tid
        time.sleep(2)
    print "Thread stopped:", tid


def handler(signum, frame):
    print "Signal is received:" + str(signum)
    global RUNNING
    RUNNING=False
    #global threads

if __name__ == '__main__':
    signal.signal(signal.SIGUSR1, handler)
    signal.signal(signal.SIGUSR2, handler)
    signal.signal(signal.SIGALRM, handler)
    signal.signal(signal.SIGINT, handler)
    signal.signal(signal.SIGQUIT, handler)

    print "Starting all threads..."
    thread1 = threading.Thread(target=monitoring, args=(1,), kwargs={'itemId':'1', 'threshold':60})
    thread1.start()
    threads.append(thread1)
    thread2 = threading.Thread(target=monitoring, args=(2,), kwargs={'itemId':'2', 'threshold':60})
    thread2.start()
    threads.append(thread2)
    while(RUNNING):
        print "Main program is sleeping."
        time.sleep(30)
    for thread in threads:
        thread.join()

    print "All threads stopped."

【讨论】:

  • 在 Windows 上不起作用,除 SIGINT 之外的所有功能在 Python 3.6 的 Windows 上均不可用。
猜你喜欢
  • 2012-07-25
  • 2021-12-21
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多