【问题标题】:process socket data that ends with a line break处理以换行符结尾的套接字数据
【发布时间】:2017-08-16 16:37:03
【问题描述】:

在我需要 var data 以换行符 \n 结束的情况下,处理套接字连接的最佳方法是什么? 我正在使用下面的代码,但有时tcp 数据包会分块,并且需要很长时间才能匹配data.endswith("\n")。 我还尝试了其他方法,例如如果最后一行不以\n 结尾,则保存它,并将其附加到下一个循环中的data。但这也不起作用,因为多个数据包被分块并且第一和第二部分不匹配。 我无法控制另一端,它基本上发送多行以\r\n 结尾。

欢迎提出任何建议,因为我对套接字连接了解不多。

def receive_bar_updates(s):
    global all_bars
    data = ''
    buffer_size = 4096
    while True:
        data += s.recv(buffer_size)
        if not data.endswith("\n"):
            continue
        lines = data.split("\n")
        lines = filter(None, lines)
        for line in lines:
            if line.startswith("BH") or line.startswith("BC"):
                symbol = str(line.split(",")[1])
                all_bars[symbol].append(line)
                y = Thread(target=proccess_bars, kwargs={'symbol': symbol})
                y.start()
        data = ""

正常”示例data:

line1\r\n
line2\r\n
line3\r\n

分块示例data:

line1\r\n
line2\r\n
lin

【问题讨论】:

    标签: python python-2.7 python-3.x sockets tcp


    【解决方案1】:

    如果您想将原始输入作为行处理,则 io 模块是您的朋友,因为它会在行中对数据包进行低级组装。

    你可以使用:

    class SocketIO(io.RawIOBase):
        def __init__(self, sock):
            self.sock = sock
        def read(self, sz=-1):
            if (sz == -1): sz=0x7FFFFFFF
            return self.sock.recv(sz)
        def seekable(self):
            return False
    

    它比endswith('\n') 更健壮,因为如果一个数据包包含嵌入的换行符('ab\ncd'),io 模块将正确处理它。你的代码可能变成:

    def receive_bar_updates(s):
        global all_bars
        data = ''
        buffer_size = 4096
        fd = SocketIO(s)  # fd can be used as an input file object
    
        for line in fd:
            if should_be_rejected_by_filter(line): continue # do not know what filter does...
            if line.startswith("BH") or line.startswith("BC"):
                symbol = str(line.split(",")[1])
                all_bars[symbol].append(line)
                y = Thread(target=proccess_bars, kwargs={'symbol': symbol})
                y.start()
    

    【讨论】:

    • 我不确定我是否收到“原始输入”,我已经用一些输入示例更新了问题@ 987654325@。 filter 删除列表中的空条目。
    • 数据包不包含嵌入换行符,只是一堆以\r\n 结尾的行,我需要处理data 如果最后一行是完整的(以\n 结尾),否则我会随机丢失最后几行,因为它们不完整。
    • @PedroLobito:TCP 是一种流协议。这意味着根据不同的设备(交换机、路由器、网关),数据包可能会被分割成更小的部分或重新排列成更大的部分。即使发送方仅发送以换行符结尾的数据包,也可以将 2 个或更多这些数据包组合成一个更大的数据包。该数据包将包含嵌入的换行符。
    • @PedroLobito: 并且 IOBase 将正确地重新组装 chunks 在线被分成 2 个不同的数据包。
    • @PedroLobito:除非重置连接,否则for line in fd 循环。
    【解决方案2】:

    使用socket.socket.makefile() 将套接字包装在实现Text I/O 的类中。它处理缓冲、字节和字符串之间的转换,并允许您遍历行。记得刷新所有写入。

    例子:

    #!/usr/bin/env python3
    import socket, threading, time
    
    
    def client(addr):
        with socket.create_connection(addr) as conn:
            conn.sendall(b'aaa')
            time.sleep(1)
            conn.sendall(b'bbb\n')
            time.sleep(1)
            conn.sendall(b'cccddd\n')
            time.sleep(1)
            conn.sendall(b'eeefff')
            time.sleep(1)
            conn.sendall(b'\n')
            conn.shutdown(socket.SHUT_WR)
            response = conn.recv(1024)
            print('client got %r' % (response,))
    
    
    def main():
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) as listen_socket:
            listen_socket.bind(('localhost', 0))
            listen_socket.listen(1)
            addr = listen_socket.getsockname()
            threading.Thread(target=client, args=(addr,)).start()
            conn, _addr = listen_socket.accept()
            conn_file = conn.makefile(mode='rw', encoding='utf-8')
            for request in conn_file:
                print('server got %r' % (request,))
            conn_file.write('response1\n')
            conn_file.flush()
    
    
    if __name__ == '__main__':
        main()
    
    $ ./example.py
    server got 'aaabbb\n'
    server got 'cccddd\n'
    server got 'eeefff\n'
    client got b'response1\n'
    $
    

    【讨论】:

      【解决方案3】:

      您是否接受不同的连接?或者它是一个数据流,由\r\n 分割?

      当接受多个连接时,您将等待与s.accept() 的连接,然后处理其所有数据。当您拥有所有数据包时,处理其数据并等待下一个连接。 然后你做什么取决于每个数据包的结构。 (例如:https://wiki.python.org/moin/TcpCommunication

      如果您使用的是数据流,您可能应该处理在单独线程中找到的每一“行”,同时继续使用另一个线程。

      编辑: 所以,如果我的情况正确;一个连接,数据是由\r\n 分解的字符串,以\n 结尾。然而,数据并不符合您的预期,而是无限循环等待\n

      据我了解,套接字接口以空数据结果结束。所以最后一个缓冲区可能以\n 结束,但随后继续获取None 对象,试图找到另一个\n

      请尝试添加以下内容:

      if not data:
          break
      

      完整代码:

      def receive_bar_updates(s):
          global all_bars
          data = ''
          buffer_size = 4096
          while True:
              data += s.recv(buffer_size)
              if not data:
                  break
              if not data.endswith("\n"):
                  continue
              lines = data.split("\n")
              lines = filter(None, lines)
              for line in lines:
                  if line.startswith("BH") or line.startswith("BC"):
                      symbol = str(line.split(",")[1])
                      all_bars[symbol].append(line)
                      y = Thread(target=proccess_bars, kwargs={'symbol': symbol})
                      y.start()
              data = ""
      

      Edit2:糟糕,代码错误

      【讨论】:

      • 我只接受一个连接,输入数据是一组以\r\n结尾的行,在确保最后一行以\n结尾后,我拆分了这些行。问题是代码卡在if not "\n" in data: continue 上,并且会循环很长时间,直到找到匹配项。
      • 代码在循环,在无穷无尽的 None 对象流中寻找\n。希望我的编辑有所帮助。
      • data 需要\n结束只包含\n
      • 是的,但是 None 对象永远不会以 \n 结尾。在你查找的实际情况之后,它会再次循环,只找到None对象并将它们添加到现在为空的data对象所以当你找到None对象时,停止查找,你就完成了。正如您所说,数据包可能会被分块。接受第一个连接后,您必须立即准备好获取下一个块(如果有的话)。在线程中处理新接受的连接,处理获得的数据,并将剩下的数据放入缓冲区以供下一个块使用。
      【解决方案4】:

      我没有测试过这段代码,但它应该可以工作:

      def receive_bar_updates(s):
          global all_bars
          data = ''
          buf = ''
          buffer_size = 4096
          while True:
              if not "\r\n" in data:  # skip recv if we already have another line buffered.
                  data += s.recv(buffer_size)
              if not "\r\n" in data:
                  continue
              i = data.rfind("\r\n")
              data, buf = data[:i+2], data[i+2:]
              lines = data.split("\r\n")
              lines = filter(None, lines)
              for line in lines:
                  if line.startswith("BH") or line.startswith("BC"):
                      symbol = str(line.split(",")[1])
                      all_bars[symbol].append(line)
                      y = Thread(target=proccess_bars, kwargs={'symbol': symbol})
                      y.start()
              data = buf
      

      编辑:忘了说,我只修改了接收数据的代码,我不知道函数的其余部分(以lines = data.split("\n")开头)是做什么的。

      编辑 2: 现在使用“\r\n”代替“\n”进行换行。

      编辑 3: 修复了一个问题。

      【讨论】:

      • 代码卡在if not "\n" in data: continue 其余代码正常工作。
      • 收到的字符串是否总是包含“\n”? (除非数据被“分块”)
      • data 需要 结束 \n 只包含\n
      • 但分块数据不以\n 结尾。因此,此代码改为检查 in 以使用下一个数据块“修复”数据。顺便说一句,我刚刚发现了另一个问题。
      【解决方案5】:

      您基本上似乎想从套接字读取行。也许您最好不要使用低级别的recv 调用,而只需使用sock.makefile() 并将结果视为常规文件,您可以从中读取行:from line in sfile: ...

      这就留下了延迟/块问题。这很可能是由发送方的Nagle's algorithm 引起的。尝试禁用它:

      sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2022-01-25
        • 1970-01-01
        • 2014-03-18
        • 1970-01-01
        • 2011-05-22
        • 1970-01-01
        • 1970-01-01
        • 2013-10-16
        相关资源
        最近更新 更多