【问题标题】:python multiprocessing/threading cleanuppython多处理/线程清理
【发布时间】:2015-05-08 15:42:52
【问题描述】:

我有一个 python 工具,基本上有这种设置:

main process (P1) -> spawns a process (P2) that starts a tcp connection
                  -> spawns a thread (T1) that starts a loop to receive 
                     messages that are sent from P2 to P1 via a Queue (Q1)

server process (P2) -> spawns two threads (T2 and T3) that start loops to
                       receive messages that are sent from P1 to P2 via Queues (Q2 and Q3)

我遇到的问题是,当我停止程序(使用 Ctrl+C)时,它并没有退出。服务器进程结束了,但是主进程就挂在那里了,我必须杀掉它。

线程循环函数看起来都一样:

def _loop(self):
    while self.running:
        res = self.Q1.get()
        if res is None:
            break
        self._handle_msg(res)

所有线程都作为守护进程启动:

t = Thread(target=self._loop)
t.setDaemon(True)
t.start()

在我的主进程中,我使用 atexit 来执行清理任务:

atexit.register(self.on_exit)

这些清理任务基本上如下:

1) 将 P1 中的 self.running 设置为 False 并将 None 发送到 Q1,以便线程 T1 应该完成

self.running = False
self.Q1.put(None)

2) 通过 Q2 向 P2 发送消息,通知该进程正在结束

self.Q2.put("stop")

3) 在 P2 中,对“停止”消息做出反应并执行我们在 P1 中所做的操作

self.running = False
self.Q2.put(None)
self.Q3.put(None)

就是这样,据我了解,这应该可以很好地关闭所有内容,但事实并非如此。

P1的主代码还包含以下无限循环,否则程序会提前结束:

while running:
    sleep(1)

也许这与问题有关,但我不明白为什么会这样。

那么我做错了什么?我的设置是否存在重大设计缺陷?我是不是忘记关闭某些东西了?

编辑

好的,我修改了我的代码并设法让它在大多数情况下正确关闭。不幸的是,从现在开始,它仍然卡住了。

我设法编写了我的代码的一个小型工作示例。为了演示发生了什么,您需要简单地启动脚本,然后使用Ctrl + C 来停止它。如果您在启动该工具后尽快按Ctrl + C,现在似乎通​​常会出现此问题。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import signal
import sys
import logging
from multiprocessing import Process, Queue
from threading import Thread
from time import sleep


logger = logging.getLogger("mepy-client")


class SocketClientProtocol(object):

    def __init__(self, q_in, q_out, q_binary):
        self.q_in = q_in
        self.q_out = q_out
        self.q_binary = q_binary
        self.running = True
        t = Thread(target=self._loop)
        #t.setDaemon(True)
        t.start()
        t = Thread(target=self._loop_binary)
        #t.setDaemon(True)
        t.start()

    def _loop(self):
        print "start of loop 2"
        while self.running:
            res = self.q_in.get()
            if res is None:
                break
            self._handle_msg(res)
        print "end of loop 2"

    def _loop_binary(self):
        print "start of loop 3"
        while self.running:
            res = self.q_binary.get()
            if res is None:
                break
            self._handle_binary(res)
        print "end of loop 3"

    def _handle_msg(self, msg):
        msg_type = msg[0]
        if msg_type == "stop2":
            print "STOP RECEIVED"
            self.running = False
            self.q_in.put(None)
            self.q_binary.put(None)

    def _put_msg(self, msg):
        self.q_out.put(msg)

    def _handle_binary(self, data):
        pass

    def handle_element(self):
        self._put_msg(["something"])

def run_twisted(q_in, q_out, q_binary):
    s = SocketClientProtocol(q_in, q_out, q_binary)
    while s.running:
        sleep(2)
        s.handle_element()


class MediatorSender(object):

    def __init__(self):
        self.q_in = None
        self.q_out = None
        self.q_binary = None
        self.p = None
        self.running = False

    def start(self):
        if self.running:
            return
        self.running = True
        self.q_in = Queue()
        self.q_out = Queue()
        self.q_binary = Queue()
        print "!!!!START"
        self.p = Process(target=run_twisted, args=(self.q_in, self.q_out, self.q_binary))
        self.p.start()
        t = Thread(target=self._loop)
        #t.setDaemon(True)
        t.start()

    def stop(self):
        print "!!!!STOP"
        if not self.running:
            return
        print "STOP2"
        self.running = False
        self.q_out.put(None)
        self.q_in.put(["stop2"])
        #self.q_in.put(None)
        #self.q_binary.put(None)

        try:
            if self.p and self.p.is_alive():
                self.p.terminate()
        except:
            pass

    def _loop(self):
        print "start of loop 1"
        while self.running:
            res = self.q_out.get()
            if res is None:
                break
            self._handle_msg(res)
        print "end of loop 1"

    def _handle_msg(self, msg):
        self._put_msg(msg)

    def _put_msg(self, msg):
        self.q_in.put(msg)

    def _put_binary(self, msg):
        self.q_binary.put(msg)

    def send_chunk(self, chunk):
        self._put_binary(chunk)

running = True
def signal_handler(signal, frame):
    global running
    if running:
        running = False
        ms.stop()
    else:
        sys.exit(0)

if __name__ == "__main__":
    signal.signal(signal.SIGINT, signal_handler)
    ms = MediatorSender()
    ms.start()
    for i in range(100):
        ms.send_chunk("some chunk of data")
    while running:
        sleep(1)

【问题讨论】:

  • 如果您可以编写一个完整的程序来演示该问题,而不仅仅是包含 sn-ps,那将会很有帮助。否则我们很难知道我们是否真的在重现你正在做的事情。
  • 你用的是什么版本的 Python?
  • @basilikum 你是在windows还是linux上?
  • @dano 我正在使用 Python 2.7。我正在尝试明天获得一个最小的工作示例。不确定它是否有效,但我们会看到。我只是认为我可能已经做了一些很容易发现的根本性错误。但我想情况并非如此。
  • @TysonU 我在 Linux 上

标签: python multithreading multiprocessing


【解决方案1】:

我认为您通过在子进程上调用 p.terminate() 来破坏您的 multiprocessing.Queue。文档对此有警告:

警告:如果在关联进程正在使用 管道或队列,则管道或队列可能会损坏,并且 可能无法被其他进程使用。同样,如果进程有 获得锁或信号量等,然后终止它很容易 导致其他进程死锁。

在某些情况下,p 似乎在您的 MediatorSender._loop 方法可以使用您加载到其中的哨兵以使其知道应该退出之前终止。

此外,您正在安装一个信号处理程序,该处理程序预计仅在主进程中工作,但 SIGINT 实际上由父 子进程接收,这意味着 signal_handler 得到在两个进程中调用,可能会导致 ms.stop 被调用两次,因为您处理将 ms.running 设置为 False 的方式存在竞争条件

我建议只利用两个进程都接收 SIGINT,并让父进程和子进程都直接处理 KeyboardInterrupt。这样,每个人都可以干净地关闭自己,而不是让父母终止孩子。以下代码演示了这一点,并且在我的测试中从未挂起。我在几个地方简化了你的代码,但功能上完全一样:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import logging
from multiprocessing import Process, Queue
from threading import Thread
from time import sleep

logger = logging.getLogger("mepy-client")

class SocketClientProtocol(object):

    def __init__(self, q_in, q_out, q_binary):
        self.q_in = q_in
        self.q_out = q_out
        self.q_binary = q_binary
        t = Thread(target=self._loop)
        t.start()
        t = Thread(target=self._loop_binary)
        t.start()

    def _loop(self):
        print("start of loop 2")
        for res in iter(self.q_in.get, None):
            self._handle_msg(res)
        print("end of loop 2")

    def _loop_binary(self):
        print("start of loop 3")
        for res in iter(self.q_binary.get, None):
            self._handle_binary(res)
        print("end of loop 3")

    def _handle_msg(self, msg):
        msg_type = msg[0]
        if msg_type == "stop2":
            self.q_in.put(None)
            self.q_binary.put(None)

    def _put_msg(self, msg):
        self.q_out.put(msg)

    def stop(self):
        print("STOP RECEIVED")
        self.q_in.put(None)
        self.q_binary.put(None)

    def _handle_binary(self, data):
        pass

    def handle_element(self):
        self._put_msg(["something"])

def run_twisted(q_in, q_out, q_binary):
    s = SocketClientProtocol(q_in, q_out, q_binary)
    try:
        while True:
            sleep(2)
            s.handle_element()
    except KeyboardInterrupt:
        s.stop()

class MediatorSender(object):

    def __init__(self):
        self.q_in = None
        self.q_out = None
        self.q_binary = None
        self.p = None
        self.running = False

    def start(self):
        if self.running:
            return
        self.running = True
        self.q_in = Queue()
        self.q_out = Queue()
        self.q_binary = Queue()
        print("!!!!START")
        self.p = Process(target=run_twisted, 
                         args=(self.q_in, self.q_out, self.q_binary))
        self.p.start()
        self.loop = Thread(target=self._loop)
        self.loop.start()

    def stop(self):
        print("!!!!STOP")
        if not self.running:
            return
        print("STOP2")
        self.running = False
        self.q_out.put(None)

    def _loop(self):
        print("start of loop 1")
        for res in iter(self.q_out.get, None):
            self._handle_msg(res)
        print("end of loop 1")

    def _handle_msg(self, msg):
        self._put_msg(msg)

    def _put_msg(self, msg):
        self.q_in.put(msg)

    def _put_binary(self, msg):
        self.q_binary.put(msg)

    def send_chunk(self, chunk):
        self._put_binary(chunk)

if __name__ == "__main__":
    ms = MediatorSender()
    try:
        ms.start()
        for i in range(100):
            ms.send_chunk("some chunk of data")
        # You actually have to join w/ a timeout in a loop on 
        # Python 2.7. If you just call join(), SIGINT won't be 
        # received by the main process, and the program will 
        # hang. This is a bug, and is fixed in Python 3.x.
        while True:
            ms.loop.join()  
    except KeyboardInterrupt:
        ms.stop()

编辑:

如果您更喜欢使用信号处理程序而不是捕获KeyboardInterrupt,您只需要确保子进程使用自己的信号处理程序,而不是继承父进程的:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import signal
import logging
from functools import partial
from multiprocessing import Process, Queue
from threading import Thread
from time import sleep

logger = logging.getLogger("mepy-client")

class SocketClientProtocol(object):

    def __init__(self, q_in, q_out, q_binary):
        self.q_in = q_in
        self.q_out = q_out
        self.q_binary = q_binary
        self.running = True
        t = Thread(target=self._loop)
        t.start()
        t = Thread(target=self._loop_binary)
        t.start()

    def _loop(self):
        print("start of loop 2")
        for res in iter(self.q_in.get, None):
            self._handle_msg(res)
        print("end of loop 2")

    def _loop_binary(self):
        print("start of loop 3")
        for res in iter(self.q_binary.get, None):
            self._handle_binary(res)
        print("end of loop 3")

    def _handle_msg(self, msg):
        msg_type = msg[0]
        if msg_type == "stop2":
            self.q_in.put(None)
            self.q_binary.put(None)

    def _put_msg(self, msg):
        self.q_out.put(msg)

    def stop(self):
        print("STOP RECEIVED")
        self.running = False
        self.q_in.put(None)
        self.q_binary.put(None)

    def _handle_binary(self, data):
        pass

    def handle_element(self):
        self._put_msg(["something"])

def run_twisted(q_in, q_out, q_binary):
    s = SocketClientProtocol(q_in, q_out, q_binary)
    signal.signal(signal.SIGINT, partial(signal_handler_child, s))
    while s.running:
        sleep(2)
        s.handle_element()

class MediatorSender(object):

    def __init__(self):
        self.q_in = None
        self.q_out = None
        self.q_binary = None
        self.p = None
        self.running = False

    def start(self):
        if self.running:
            return
        self.running = True
        self.q_in = Queue()
        self.q_out = Queue()
        self.q_binary = Queue()
        print("!!!!START")
        self.p = Process(target=run_twisted, 
                         args=(self.q_in, self.q_out, self.q_binary))
        self.p.start()
        self.loop = Thread(target=self._loop)
        self.loop.start()

    def stop(self):
        print("!!!!STOP")
        if not self.running:
            return
        print("STOP2")
        self.running = False
        self.q_out.put(None)

    def _loop(self):
        print("start of loop 1")
        for res in iter(self.q_out.get, None):
            self._handle_msg(res)
        print("end of loop 1")

    def _handle_msg(self, msg):
        self._put_msg(msg)

    def _put_msg(self, msg):
        self.q_in.put(msg)

    def _put_binary(self, msg):
        self.q_binary.put(msg)

    def send_chunk(self, chunk):
        self._put_binary(chunk)

def signal_handler_main(ms, *args):
    ms.stop()

def signal_handler_child(s, *args):
    s.stop()

if __name__ == "__main__":
    ms = MediatorSender()
    signal.signal(signal.SIGINT, partial(signal_handler_main, ms))
    ms.start()
    for i in range(100):
        ms.send_chunk("some chunk of data")
    while ms.loop.is_alive():
        ms.loop.join(9999999)  
    print('done main')

【讨论】:

  • 很棒的答案!我刚刚尝试了你的脚本,它似乎很适合为join() 设置超时或使用while ms.running: sleep(1)。您知道简单地使用任意大的超时数是否有任何缺点?唯一让我有点担心的是KeyboardInterrupt 异常。我怀疑它只有在我实际使用Ctrl + C 时才会起作用,但如果我以任何其他方式发送 SIGINT 则不起作用?那么有没有办法让它与原始信号一起工作?例如,有什么方法可以区分信号处理程序中的两个进程?
  • @basilikum 使用join 的超大超时也同样有效。如果您想使用信号处理程序而不是处理KeyboardInterrupt,您可以在子进程中使用单独的信号处理程序。查看我的编辑。
  • 太好了,这正是我想要的。谢谢!
【解决方案2】:

也许您应该尝试捕获SIGINT 信号,该信号由Ctrl + C 使用signal.signal 生成,如下所示:

#!/usr/bin/env python
import signal
import sys
def signal_handler(signal, frame):
        print('You pressed Ctrl+C!')
        sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
print('Press Ctrl+C')
signal.pause()

here盗取的代码

【讨论】:

  • 谢谢,是的!这实际上是我在稍微改进的代码中所做的(见编辑)。不过还是时不时会出现一些问题。
  • 您是否尝试过在停止函数SocketClientProtocol._handle_msgMediatorSender.stop 中使用put_nowait 而不是put
【解决方案3】:

如果我使用线程模块,这通常对我有用。但是,如果您使用多处理程序,它将无法工作。如果您从终端运行脚本,请尝试在后台运行它,像这样。

python scriptFoo.py &

运行进程后,它会像这样输出 PID

[1] 23107

每当您需要退出脚本时,您只需键入 kill 和脚本 PID 就可以了。

kill 23107

再次按回车,它应该会杀死所有子进程并输出它。

[1]+  Terminated              python scriptFoo.py

据我所知,你不能用 'Ctrl+C' 杀死所有子进程

【讨论】:

  • 感谢您的回答,但这并不是我真正想要的。我不想更改您需要启动或停止该工具的方式。人们应该能够简单地使用Ctrl + C 来退出一切。我相信一定有办法解决这个问题。
猜你喜欢
  • 2013-04-03
  • 1970-01-01
  • 1970-01-01
  • 2021-08-30
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-01-27
  • 1970-01-01
相关资源
最近更新 更多