【问题标题】:Twisted: how to read from a client socket after writing data to the same socket?Twisted:将数据写入同一个套接字后如何从客户端套接字读取?
【发布时间】:2015-11-18 11:44:54
【问题描述】:

我正在尝试用 twisted 编写一个简单的 TCP 服务器,它必须按顺序执行以下操作:

  1. 客户端连接到服务器,并且此连接的 KEEPALIVE 标志设置为 1。
  2. 服务器从客户端接收数据。
  3. 然后它计算一个列表的响应。
  4. 然后,服务器在等待来自客户端的显式 ACK 的同时,一个接一个地发送列表中的每个项目,即,在从列表中发送单个项目之后,服务器等待来自客户端的 ACK 数据包,并且只有在收到 ACK 后,它是否会继续以相同的方式发送其余项目。

以下是代码:

class MyFactory(ServerFactory):
    protocol = MyProtocol

    def __init__(self, service):
        self.service = service

class MyProtocol(Protocol):

    def connectionMade(self):
         try:
             self.transport.setTcpKeepAlive(1)
         except AttributeError: 
             pass
         self.deferred = Deferred()
         self.deferred.addCallback(self.factory.service.compute_response)
         self.deferred.addCallback(self.send_response)

    def dataReceived(self, data):
         self.fire(data)

    def fire(self, data):
        if self.deferred is not None:
            d, self.deferred = self.deferred, None
            d.callback(data)

    def send_response(self, data):
        for item in data:
            d = Deferred()
            d.addCallback(self.transport.write)
            d.addCallback(self.wait_for_ack)
            d.callback(item)
        return   

    def wait_for_ack(self, dummy):
        try:
            self.transport.socket.recv(1024)
        except socket.error as e:
            print e
        return

在运行服务器和客户端时,我得到以下异常:

Resource temporarily unavailable

我了解此异常的原因 - 我正在尝试在非阻塞套接字上调用阻塞方法。

请帮助我找到解决此问题的方法。

【问题讨论】:

    标签: python sockets tcp twisted twisted.internet


    【解决方案1】:

    您的示例存在一些问题:

    1. 您没有在任何地方(除其他外)定义compute_response,因此我无法运行您的示例。考虑将其设为http://sscce.org
    2. 您永远不应该在 Twisted 传输底层的套接字上调用 sendrecv;让 Twisted 为您调用这些方法。在recv 的情况下,它将recv 的结果传递给dataReceived
    3. 您不能依赖dataReceived 来接收整个消息;数据包可能是always be arbitrarily fragmented in transit,所以你需要一个框架协议来封装你的消息。

    但是,由于我的其他答案非常拙劣,我欠你一个更彻底的解释,说明如何设置你想要做的事情。

    正如您的问题所规定的,您的协议没有完全定义到足以给出答案;您不能使用原始 TCP 片段进行请求和响应,因为您的应用程序无法知道它们的开始和结束位置(参见上面的第 3 点)。所以,我为这个例子发明了一个小协议:它是一个行分隔的协议,客户端发送"request foo\n",服务器立即发送"thinking...\n",计算一个响应,然后发送"response foo\n"并等待客户端发送"ok";作为响应,服务器将发送下一个"response ..." 行,或者发送一个"done\n" 行,指示它已完成发送响应。

    作为我们的协议,我相信您问题的关键要素是您不能在 Twisted 中“等待确认”,或者就此而言,其他任何事情。您需要做的是按照“收到确认时......”的方式实现一些东西。

    因此,当收到消息时,我们需要识别消息的类型:确认还是请求?

    1. 如果是请求,我们需要计算响应;当响应完成计算后,我们需要将响应的所有元素排入队列并发送第一个。
    2. 如果是确认,我们需要检查响应的传出队列,如果它有任何内容,则发送它的第一个元素;否则,发送“完成”。

    这是一个完整的、可运行的示例,它实现了我以这种方式描述的协议:

    from twisted.internet.protocol import ServerFactory
    from twisted.internet.task import deferLater
    from twisted.internet import reactor
    from twisted.internet.interfaces import ITCPTransport
    from twisted.protocols.basic import LineReceiver
    
    class MyProtocol(LineReceiver):
        delimiter = "\n"
        def connectionMade(self):
            if ITCPTransport.providedBy(self.transport):
                self.transport.setTcpKeepAlive(1)
            self.pendingResponses = []
    
        def lineReceived(self, line):
            split = line.rstrip("\r").split(None, 1)
            command = split[0]
            if command == b"request":
                # requesting a computed response
                payload = split[1]
                self.sendLine("thinking...")
                (self.factory.service.computeResponse(payload)
                 .addCallback(self.sendResponses))
            elif command == b"ok":
                # acknowledging a response; send the next response
                if self.pendingResponses:
                    self.sendOneResponse()
                else:
                    self.sendLine(b"done")
    
        def sendOneResponse(self):
            self.sendLine(b"response " + self.pendingResponses.pop(0))
    
        def sendResponses(self, listOfResponses):
            self.pendingResponses.extend(listOfResponses)
            self.sendOneResponse()
    
    class MyFactory(ServerFactory):
        protocol = MyProtocol
    
        def __init__(self, service):
            self.service = service
    
    class MyService(object):
        def computeResponse(self, request):
            return deferLater(
                reactor, 1.0,
                lambda: [request + b" 1", request + b" 2", request + b" 3"]
            )
    
    from twisted.internet.endpoints import StandardIOEndpoint
    endpoint = StandardIOEndpoint(reactor)
    
    endpoint.listen(MyFactory(MyService()))
    reactor.run()
    

    我已经让这个可以在标准 I/O 上运行,这样你就可以运行它并输入它来感受它是如何工作的;如果您想在实际的网络端口上运行它,只需将其替换为different type of endpoint。希望这能回答您的问题。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2016-03-02
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-07-08
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多