【问题标题】:Python: Queue.get() blocks code from checking socket connectionPython:Queue.get() 阻止代码检查套接字连接
【发布时间】:2026-02-04 00:00:01
【问题描述】:

我正在运行一个线程,该线程从队列中获取消息,将其发送给客户端,然后接收确认。如果客户端断开连接,线程会捕获套接字错误并终止。问题是如果 msgQ 为空,则线程永远不会检查套接字连接。有没有办法设置此代码,即使队列为空,也会检查套接字? (问题是,没有消息,就没有什么可发送的)

我需要发送一个特殊的乒乓球吗?如果 msgQ 为空,则消息(并在客户端上检查消息是日志数据还是 ping?)?任何帮助,将不胜感激。

def run(self): 
    while not self._terminate: 
        try: 
            msgs = self.msgQ.get()

            self.sock.send(pickle.dumps(msgs))
            rdy = pickle.loads(self.sock.recv(2097152))
        except socket.error, EOFError: 
            print 'log socketmanager closing'
            self.terminate()
            break
        except Empty: pass

【问题讨论】:

  • self.sock.recv() 是一个阻塞调用,除非你有非阻塞套接字。这意味着如果队列中没有消息,则发出 self.sock.recv() 将阻塞并且在收到某些内容之前不会继续处理。这是你想做的吗?

标签: python sockets queue


【解决方案1】:

如果msgQ 为空,则对self.msgQ.get() 的调用会引发Empty 异常并完全跳过对self.sock.send()self.sock.recv() 的调用。 Empty 异常的异常处理程序不执行任何操作,因此您的代码将 busy wait 直到 msgQ 中出现某些内容,而无需调用 sendrecv

一种可能的解决方案是使用 python select module 检查异常处理程序中的套接字。大致如下:

def run(self): 
    while not self._terminate: 
        try: 
            msgs = self.msgQ.get()

            self.sock.send(pickle.dumps(msgs))
            rdy = pickle.loads(self.sock.recv(2097152))
        except socket.error, EOFError: 
            print 'log socketmanager closing'
            self.terminate()
            break
        except Empty:
            results = select.select([], [], [self.sock], 0.5) # timeout of 0.5 seconds
            if self.sock in results[2]:
                print 'exceptional condition on socket'
                self.terminate()
                break

【讨论】:

  • 这段代码到底是做什么的?我目前的解决方法只是发送一个特殊的“ping?” Queue.Empty 情况下的套接字消息(并有适当的客户端处理“ping?”与真实数据)。
  • select.select 的调用检查套接字上的异常情况。异常情况的确切定义取决于您的系统 - 查看 python 文档以获取更多信息。重点是查找套接字上发生的事件,而无需调用recv,因为可能没有任何数据要接收。 select 函数返回对应于三个参数列表的 3 个列表的元组。如果参数列表中的套接字出现在结果列表中,则该套接字上发生了某些事情。
  • 感谢您的信息,您赢了。
【解决方案2】:

如果您使用Queue.Queue,您可以使用可选的blocktimeout args 调用Queue.get(),或者调用Queue.get_nowait(),它相当于Queue.get(block=False),如果队列为空,将立即返回:

try:
    # wait for 1/10 second then return
    msgs = self.msgQ.get(timeout=0.1)
except Queue.Empty, qe:
    # handle empty queue 

【讨论】:

    【解决方案3】:

    我有一个想法,就是将 Queue 和 Pipe 封装到一个类中。然后使用 select 模块检查套接字和管道,不会使用轮询来监视事件。

    以下包装队列和管道的示例代码:

    class MyQueue(object):
    """docstring for MyQueue"""
    def __init__(self, arg):
        super(MyQueue, self).__init__()
        # self.arg = arg
        self._queue = Queue.Queue()
        self._rdfd, self._wrfd = os.pipe()
        return
    
    def enQ(self, item=None, block=True, timeout=None):
        os.write(self._wrfd, '+')
        self._queue.put(item)
        return
    
    def deQ(self, block=True, timeout=None):
        _itm = self._queue.get(block=block, timeout=timeout)
        _msg = os.read(self._rdfd, 1)
        return _itm
    
    def get_notify(self):
        return self._rdfd
    

        print 'append queue'
        self.inp.append(self._myq.get_notify())
        print 'append socket'
        self.inp.append(self.sock)
        print self.inp
    
        _slp = self.slp
        _inp = self.inp
        _out = self.out
        _err = self.err
        _in, _out, _err = select.select(_inp, _out, _err, _slp)
    

    【讨论】: