【发布时间】:2014-10-16 16:18:27
【问题描述】:
我有一个 Twisted 应用程序正在侦听 Int32StringReceiver 消息,然后将它们重新发送到另一个应用程序。基本上,它是一个路由器,但它有一些智能,可以内省数据的去向。
我的问题是出站端,收到很多错误消息等。
入站是一个类Receiver(Int32StringReceiver):
def doActualForwarding(self, data):
self.stats.recvBits += 8 * (4 + len(data))
self.stats.recvMsgs += 1
dlen = len(data)
if dlen > 1024*256:
self.logger.info("router.Receiver.doActualForwarding(): data len: %s" % (dlen))
self.router.forward(data)
def stringReceived(self, data):
d = threads.deferToThread(self.doActualForwarding, data)
d.addCallback(self.forwardingDoneOkay)
d.addErrback(self.forwardingDoneError)
self.router 是实例化的对象,需要通过套接字通信以相同的格式发送这些消息。所以,它只是转身在路由器类中执行此操作:
def connect(self):
if self.sock:
try:
self.sock.close()
except:
pass
try:
self.stats.connectAttempts += 1
self.sock = socket.socket()
self.sock.settimeout(self.CONNECT_TIMEOUT)
self.sock.connect(self.destination)
self.sock.settimeout(self.SEND_TIMEOUT)
self.set_keepalive_linux(self.sock)
self.connected = True
self.log.info("connected to %s" % (self.destination,))
self.stats.reconnects += 1
self.stats.connectCompletes += 1
return True
except Exception, e:
self.connected = False
if not self.drop_ok:
self.log.error("connect %s: %s" % (self.destination, e))
return False
def send(self, msg):
trynum = 0
while trynum < self.MAX_SEND_ATTEMPTS:
self.logSent()
if not self.connected:
if not self.connect():
self.stats.badSends += 1
time.sleep(self.DELAY_BEFORE_RECONNECT)
continue
try:
if ((time.time() - self.lastReconnectTime) > self.RECONNECT_EVERY):
self.lastReconnectTime = time.time()
assert False, "Reconnecting with destination to redistribute load."
self.sock.sendall(msg)
#self.closeSocket()
self.stats.events += 1
return True
except Exception, e:
whichKind = None
if 'Broken pipe' in str(e):
self.stats.brokenPipe += 1
elif 'Resource temporarily unavilable' in str(e):
self.stats.resourceTempUnavail += 1
elif 'Bad file descriptor' in str(e):
self.stats.badFileDescriptor += 1
self.log.error("send: %s %s" % (str(self.destination), str(e)))
try:
self.sock.close()
except:
pass
self.connected = False
self.stats.badSends += 1
trynum += 1
if trynum == 1:
self.stats.eventsWithRetry += 1
if trynum > 1:
self.log.warning("recon_sender.send(): Trynum non-singular, was: %s" % (trynum))
return False
def __del__(self):
try:
self.sock.close()
except:
pass
问题:
Python 的 Socket 库是线程安全的吗?也就是说,在功能上,两个或多个线程都有一个指向对象路由器的指针。两个线程都在调用 self.sock.sendall(msg),我担心它们会互相踩踏。
一个症状是可能是连续的消息相互附加。我不确定这一点,但看起来是这样的。
-
我看到很多资源临时。 unavail(意味着目的地很忙),大约相同数量的损坏管道,以及少量的坏文件描述符。
- [Errno 9] 文件描述符错误
- [Errno 11] 资源暂时不可用
- [Errno 32] 断管
这些消息可能对应于通过这个东西的消息数量的 0.5% (.005)。
- 我尝试让每个发送都执行一次连接/发送/关闭/关闭,但这会导致大量有关“对等方重置连接”的消息。
似乎每个人都专注于处理套接字上的多线程接收的代码,但对套接字上的多线程发送的评论并不多。
-
我也尝试使用(可能不正确):
导入线程 self.lock = threading.Lock() 使用 self.lock: sock.sendall(msg)
但这会导致有关超时的错误消息(糟糕)。
- 有人能指出一些好的例子(或提供一些例子吗?!?!?!?),演示多线程套接字 sendall()?
【问题讨论】:
-
Python 本身通常不是线程安全的,因此您必须使用多处理来绕过 gil。
-
你推迟到一个线程有什么原因吗?这不是在 Twisted 中处理连接的常用方法,并且可能是问题的原因。
-
Twisted 不是线程安全的,通常没有理由使用线程在 Twisted 中发送网络流量。为什么要做线程和低级套接字 IO?真的不清楚你为什么不只是打电话给
transport.write...
标签: python multithreading sockets twisted