【发布时间】:2022-01-16 13:23:26
【问题描述】:
我正在尝试编写一个简单的守护程序来侦听 Unix 套接字上的命令。以下工作,但 connection.recv(1024) 行阻塞,这意味着我不能优雅地终止服务器:
import socket, os
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as server:
server.bind("/tmp/sock")
server.listen()
connection, __ = server.accept()
with connection:
while True:
data = connection.recv(1024)
print("Hi!") # This line isn't executed 'til data is sent
if data:
print(data.decode())
理想情况下,我想将所有这些都放在Thread 中,该self.should_stop 属性每self.LOOP_TIME 秒检查一次,如果该值设置为True,则退出。但是,由于 .recv() 行阻塞,我的程序除了在任何给定时间等待之外别无他法。
当然有一种正确的方法可以做到这一点,但由于我是套接字新手,我不知道那是什么。
编辑
Jeremy Friesner 的回答让我走上了正轨。我意识到我可以允许线程阻塞并简单地设置.should_stop,然后将b"" 传递给套接字,以便它解除阻塞,看到它应该停止,然后干净地退出。这是最终结果:
import os
import socket
from pathlib import Path
from shutil import rmtree
from threading import Thread
class MyThreadThing(Thread):
RUNTIME_DIR = Path(os.getenv("XDG_RUNTIME_DIR", "/tmp")) / "my-project-name"
def __init__(self):
super().__init__(daemon=True)
self.should_stop = False
if self.RUNTIME_DIR.exists():
rmtree(self.RUNTIME_DIR)
self.RUNTIME_DIR.mkdir(0o700)
self.socket_path = self.RUNTIME_DIR / "my-project.sock"
def run(self) -> None:
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
s.bind(self.socket_path.as_posix())
s.listen()
while True:
connection, __ = s.accept()
action = ""
with connection:
while True:
received = connection.recv(1024).decode()
action += received
if not received:
break
# Handle whatever is in `action`
if self.should_stop:
break
self.socket_path.unlink()
def stop(self):
"""
Trigger this when you want to stop the listener.
"""
self.should_stop = True
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
s.connect(self.socket_path.as_posix())
s.send(b"")
【问题讨论】:
-
为什么不使用 asyncio?这样您就可以使用任务并以一种简单的方式停止执行,避免使用线程
-
您可以在套接字上设置 os.O_NONBLOCK 选项,然后如果没有数据,它将立即返回。问题是您必须定期轮询套接字。这并不理想。你真正想要的是使用 select()
-
@Grismar 不幸的是没有。当我将
blocking设置为False时,它会与BlockingIOError: [Errno 11] Resource temporarily unavailable发生冲突。我可以将timeout设置为非零值,但随后出现超时错误。 -
@eyllanesc 我的用例调用线程。