【问题标题】:How can I get a socket's .recv() not to block?如何让套接字的 .recv() 不阻塞?
【发布时间】:2022-01-16 13:23:26
【问题描述】:

我正在尝试编写一个简单的守护程序来侦听 Unix 套接字上的命令。以下工作,但 connection.recv(1024) 行阻塞,这意味着我不能优雅地终止服务器:

import socket, os

with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as server:
    server.bind("/tmp/sock")
    server.listen()
    connection, __ = server.accept()
    with connection:
        while True:
            data = connection.recv(1024)
            print("Hi!")  # This line isn't executed 'til data is sent
            if data:
                print(data.decode())

理想情况下,我想将所有这些都放在Thread 中,该self.should_stop 属性每self.LOOP_TIME 秒检查一次,如果该值设置为True,则退出。但是,由于 .recv() 行阻塞,我的程序除了在任何给定时间等待之外别无他法。

当然有一种正确的方法可以做到这一点,但由于我是套接字新手,我不知道那是什么。

编辑

Jeremy Friesner 的回答让我走上了正轨。我意识到我可以允许线程阻塞并简单地设置.should_stop,然后将b"" 传递给套接字,以便它解除阻塞,看到它应该停止,然后干净地退出。这是最终结果:

import os
import socket

from pathlib import Path
from shutil import rmtree
from threading import Thread


class MyThreadThing(Thread):

    RUNTIME_DIR = Path(os.getenv("XDG_RUNTIME_DIR", "/tmp")) / "my-project-name"

    def __init__(self):

        super().__init__(daemon=True)

        self.should_stop = False

        if self.RUNTIME_DIR.exists():
            rmtree(self.RUNTIME_DIR)
        self.RUNTIME_DIR.mkdir(0o700)

        self.socket_path = self.RUNTIME_DIR / "my-project.sock"

    def run(self) -> None:

        with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:

            s.bind(self.socket_path.as_posix())
            s.listen()

            while True:

                connection, __ = s.accept()
                action = ""

                with connection:

                    while True:

                        received = connection.recv(1024).decode()
                        action += received
                        if not received:
                            break

                    # Handle whatever is in `action`

                if self.should_stop:
                    break

        self.socket_path.unlink()

    def stop(self):
        """
        Trigger this when you want to stop the listener.
        """
        self.should_stop = True
        with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
            s.connect(self.socket_path.as_posix())
            s.send(b"")

【问题讨论】:

  • 为什么不使用 asyncio?这样您就可以使用任务并以一种简单的方式停止执行,避免使用线程
  • 您可以在套接字上设置 os.O_NONBLOCK 选项,然后如果没有数据,它将立即返回。问题是您必须定期轮询套接字。这并不理想。你真正想要的是使用 select()
  • @Grismar 不幸的是没有。当我将blocking 设置为False 时,它会与BlockingIOError: [Errno 11] Resource temporarily unavailable 发生冲突。我可以将timeout 设置为非零值,但随后出现超时错误。
  • @eyllanesc 我的用例调用线程。

标签: python sockets


【解决方案1】:

使用任意长度的超时总是有点不满意——要么将超时值设置为相对较长的时间,在这种情况下,您的程序对退出请求的反应会变慢,因为它毫无意义地等待超时period to expire... 或者您将 timeout-value 设置为相对较短的时间,在这种情况下,您的程序会不断唤醒以查看它是否应该退出,从而 24/7 浪费 CPU 电源来检查可能永远不会到达的事件.

处理该问题的一种更优雅的方法是创建一个管道,并在您希望事件循环退出时在管道上发送一个字节。您的事件循环可以通过select() 同时“监视”管道的读取端文件描述符和网络套接字,并且当该文件描述符指示它已准备好读取时,您的事件循环可以响应通过退出。这种方法完全是事件驱动的,所以它不需要 CPU 唤醒,除非确实有事情要做。

以下是您的程序的示例版本,它实现了 SIGINT 的信号处理程序(也就是按 Control-C)以在管道上发送 please-quit-now 字节:

import socket, os
import select
import signal, sys

# Any bytes written to (writePipeFD) will become available for reading on (readPipeFD)
readPipeFD, writePipeFD = os.pipe()

# Set up a signal-handler to handle SIGINT (aka Ctrl+C) events by writing a byte to the pipe
def signal_handler(sig, frame):
    print("signal_handler() is executing -- SIGINT detected!")
    os.write(writePipeFD, b"\0")  # doesn't matter what we write; a single 0-byte will do
signal.signal(signal.SIGINT, signal_handler)

with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as serverSock:
    serverSock.bind("/tmp/sock")
    serverSock.listen()

    # Wait for incoming connection (or the please-quit signal, whichever comes first)
    connection = None
    while True:
       readReady,writeReady,exceptReady = select.select([readPipeFD,serverSock], [], [])
       if (readPipeFD in readReady):
          print("accept-loop: Someone wrote a byte to the pipe; time to go away!");
          break
       if (connection in readReady):
          connection, __ = serverSock.accept()
          break

    # Read data from incoming connection (or the please-quit signal, whichever comes first)
    if connection:
       with connection:
          while True:
             readReady,writeReady,exceptReady = select.select([readPipeFD,connection], [], [])
             if (readPipeFD in readReady):
                print("Connection-loop: Someone wrote a byte to the pipe; time to go away!");
                break
             if (connection in readReady):
                data = connection.recv(1024)
                print("Hi!")  # This line isn't executed 'til data is sent
                if data:
                   print(data.decode())

    print("Bye!")

【讨论】:

    【解决方案2】:

    使用与LOOP_TIME 相同的超时时间,如下所示:

    import socket, os
    
    LOOP_TIME = 10
    should_stop = False
    
    with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as server:
        server.bind("/tmp/sock")
        server.listen()
        connection, __ = server.accept()
        connection.settimeout(LOOP_TIME)
        with connection:
            while not should_stop:
                try:
                    data = connection.recv(1024)
                except socket.timeout:
                    continue
                print("Hi!")  # This line isn't executed 'til data is sent
                if data:
                    print(data.decode())
    

    你可以使用select,但是如果只是一个简单的socket,这种方式就简单了一点。

    您可以选择将其放置在带有 self.should_stop 的不同线程中或仅放在主线程中 - 它现在将侦听 KeyboardInterrupt。

    【讨论】:

      猜你喜欢
      • 2011-09-13
      • 1970-01-01
      • 2023-03-03
      • 2010-10-29
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多