【发布时间】:2015-05-08 15:42:52
【问题描述】:
我有一个 python 工具,基本上有这种设置:
main process (P1) -> spawns a process (P2) that starts a tcp connection
-> spawns a thread (T1) that starts a loop to receive
messages that are sent from P2 to P1 via a Queue (Q1)
server process (P2) -> spawns two threads (T2 and T3) that start loops to
receive messages that are sent from P1 to P2 via Queues (Q2 and Q3)
我遇到的问题是,当我停止程序(使用 Ctrl+C)时,它并没有退出。服务器进程结束了,但是主进程就挂在那里了,我必须杀掉它。
线程循环函数看起来都一样:
def _loop(self):
while self.running:
res = self.Q1.get()
if res is None:
break
self._handle_msg(res)
所有线程都作为守护进程启动:
t = Thread(target=self._loop)
t.setDaemon(True)
t.start()
在我的主进程中,我使用 atexit 来执行清理任务:
atexit.register(self.on_exit)
这些清理任务基本上如下:
1) 将 P1 中的 self.running 设置为 False 并将 None 发送到 Q1,以便线程 T1 应该完成
self.running = False
self.Q1.put(None)
2) 通过 Q2 向 P2 发送消息,通知该进程正在结束
self.Q2.put("stop")
3) 在 P2 中,对“停止”消息做出反应并执行我们在 P1 中所做的操作
self.running = False
self.Q2.put(None)
self.Q3.put(None)
就是这样,据我了解,这应该可以很好地关闭所有内容,但事实并非如此。
P1的主代码还包含以下无限循环,否则程序会提前结束:
while running:
sleep(1)
也许这与问题有关,但我不明白为什么会这样。
那么我做错了什么?我的设置是否存在重大设计缺陷?我是不是忘记关闭某些东西了?
编辑
好的,我修改了我的代码并设法让它在大多数情况下正确关闭。不幸的是,从现在开始,它仍然卡住了。
我设法编写了我的代码的一个小型工作示例。为了演示发生了什么,您需要简单地启动脚本,然后使用Ctrl + C 来停止它。如果您在启动该工具后尽快按Ctrl + C,现在似乎通常会出现此问题。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import signal
import sys
import logging
from multiprocessing import Process, Queue
from threading import Thread
from time import sleep
logger = logging.getLogger("mepy-client")
class SocketClientProtocol(object):
def __init__(self, q_in, q_out, q_binary):
self.q_in = q_in
self.q_out = q_out
self.q_binary = q_binary
self.running = True
t = Thread(target=self._loop)
#t.setDaemon(True)
t.start()
t = Thread(target=self._loop_binary)
#t.setDaemon(True)
t.start()
def _loop(self):
print "start of loop 2"
while self.running:
res = self.q_in.get()
if res is None:
break
self._handle_msg(res)
print "end of loop 2"
def _loop_binary(self):
print "start of loop 3"
while self.running:
res = self.q_binary.get()
if res is None:
break
self._handle_binary(res)
print "end of loop 3"
def _handle_msg(self, msg):
msg_type = msg[0]
if msg_type == "stop2":
print "STOP RECEIVED"
self.running = False
self.q_in.put(None)
self.q_binary.put(None)
def _put_msg(self, msg):
self.q_out.put(msg)
def _handle_binary(self, data):
pass
def handle_element(self):
self._put_msg(["something"])
def run_twisted(q_in, q_out, q_binary):
s = SocketClientProtocol(q_in, q_out, q_binary)
while s.running:
sleep(2)
s.handle_element()
class MediatorSender(object):
def __init__(self):
self.q_in = None
self.q_out = None
self.q_binary = None
self.p = None
self.running = False
def start(self):
if self.running:
return
self.running = True
self.q_in = Queue()
self.q_out = Queue()
self.q_binary = Queue()
print "!!!!START"
self.p = Process(target=run_twisted, args=(self.q_in, self.q_out, self.q_binary))
self.p.start()
t = Thread(target=self._loop)
#t.setDaemon(True)
t.start()
def stop(self):
print "!!!!STOP"
if not self.running:
return
print "STOP2"
self.running = False
self.q_out.put(None)
self.q_in.put(["stop2"])
#self.q_in.put(None)
#self.q_binary.put(None)
try:
if self.p and self.p.is_alive():
self.p.terminate()
except:
pass
def _loop(self):
print "start of loop 1"
while self.running:
res = self.q_out.get()
if res is None:
break
self._handle_msg(res)
print "end of loop 1"
def _handle_msg(self, msg):
self._put_msg(msg)
def _put_msg(self, msg):
self.q_in.put(msg)
def _put_binary(self, msg):
self.q_binary.put(msg)
def send_chunk(self, chunk):
self._put_binary(chunk)
running = True
def signal_handler(signal, frame):
global running
if running:
running = False
ms.stop()
else:
sys.exit(0)
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal_handler)
ms = MediatorSender()
ms.start()
for i in range(100):
ms.send_chunk("some chunk of data")
while running:
sleep(1)
【问题讨论】:
-
如果您可以编写一个完整的程序来演示该问题,而不仅仅是包含 sn-ps,那将会很有帮助。否则我们很难知道我们是否真的在重现你正在做的事情。
-
你用的是什么版本的 Python?
-
@basilikum 你是在windows还是linux上?
-
@dano 我正在使用 Python 2.7。我正在尝试明天获得一个最小的工作示例。不确定它是否有效,但我们会看到。我只是认为我可能已经做了一些很容易发现的根本性错误。但我想情况并非如此。
-
@TysonU 我在 Linux 上
标签: python multithreading multiprocessing