【问题标题】:Keep TCP socket connection alive and read/write coordination保持 TCP 套接字连接活动和读/写协调
【发布时间】:2020-01-12 21:00:22
【问题描述】:

我的设置和环境:

  • Win10
  • TCP 客户端:带有 asio 库的 C++(无增强),在后台线程中运行。
  • TCP 服务器:带有SocketServer 模块的 Python3,在后台线程中运行。
  • 目前都使用阻塞 I/O

要求:

  • 客户端偶尔向服务器发送用户交互的字符串命令。
  • 服务器接收命令并执行操作。

问题:

  • 客户端可能会挂在read()
  • 在客户端挂起后,服务器可能会在 recv() 处挂起。

“净”结果是服务器总是得到我最初的“Hello”握手,但它的响应字符串ACK 从未被客户端收到。看起来需要在客户端和服务器之间进行某种读/写协调。

https://stackoverflow.com/a/1480246/987846Does the TCPServer + BaseRequestHandler in Python's SocketServer close the socket after each call to handle()? 之类的问题中,我了解到可能涉及两个问题

  • Python TCP 服务器总是在接收时关闭连接,这样处理程序就不会被连续调用。
  • TCP 连接在因超时而关闭之前需要保持活动状态。

我想知道如何解决这个问题,什么是满足我要求的最佳策略

  • 在服务器和客户端之间设计一个周期性的“健全性检查”乒乓数据传输。
  • 求助于我不熟悉的非阻塞 I/O。

或者这只是我的一个错误?

服务器代码:

import socketserver
import sys
import threading

_dostuff = True

class CmdHandler(socketserver.StreamRequestHandler):
    def handle(self):
        while True:                        
            data = self.request.recv(1024)
            s = data.decode('utf-8')
            if s == 'Hello':
                print('HANDSHAKE: ACK', flush=True)
                self.request.send('ACK\x00'.encode())
            if s == 'Stop':            
                print('Cmd: Mute', flush=True)
                with threading.Lock():
                    _dostuff = False
            if s == 'Start':
                with threading.Lock():
                    _dostuff = True
        return

if __name__ == '__main__':
    import socket    
    import time

    # Command server
    address = ('localhost', 1234)  # let the kernel assign a port
    cmd_server = socketserver.TCPServer(address, CmdHandler)
    cmd_ip, cmd_port = cmd_server.server_address  # what port was assigned?

    t1 = threading.Thread(target=cmd_server.serve_forever)
    t1.setDaemon(True)  # don't hang on exit
    t1.start()

    while True:            
        time.sleep(1)

客户端代码(部分)

virtual bool Connect() override {
        bool isInitialized = false;
        try {
            asio::io_context io_context;
            asio::ip::tcp::resolver resolver(io_context);
            asio::ip::tcp::resolver::query query("127.0.0.1", "1234");
            asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
            asio::ip::tcp::socket socket(io_context);
            asio::connect(socket, endpoint_iterator);
            while (true) {
                std::array<char, 128> buf;
                asio::error_code error;
                // Handshaking
                // - on connection, say hello to cmd-server; wait for ACK
                if ( ! isInitialized ) {
                    debug("CmdClient {}: handshaking ...", m_id.c_str());
                    std::string handshake("Hello");
                    asio::write(socket, asio::buffer(handshake.c_str(), handshake.length()));
                    if (error == asio::error::eof)
                        continue; // Connection closed cleanly by peer; keep trying.
                    else if (error)
                        throw asio::system_error(error); // Some other error.

                    // ***PROBLEM: THIS MAY BLOCK FOREVER***
                    size_t len = asio::read(socket, asio::buffer(buf), error);
                    // ***PROBLEM END***


                    if (len <= 0) {
                        debug("CmdClient {}: No response", m_id.c_str());
                    }                   
                    std::string received = std::string(buf.data());
                    if (received == std::string("ACK")) {
                        debug("CmdClient {}: handshaking ... SUCCESS!", m_id.c_str());
                        isInitialized = true;
                        Notify("ACK");
                    }
                    else {
                        debug("CmdClient {}: Received: {}", m_id.c_str(), received.c_str());
                    }
                    continue;
                }
                SendCommand(socket);
            }
        }
        catch (std::exception& e) {
            std::cerr << e.what() << std::endl;
            isInitialized = false;
        }
        return true;
    }


    void SendCommand(asio::ip::tcp::socket& socket) {
        std::string cmd("");
        switch (m_cmd) {
        case NoOp:
            break;
        case Stop:
            cmd = "Stop";
            break;
        case Start:
            cmd = "Start";
            break;
        default:
            break;
        }
        if (cmd.size() > 0) {
            debug("CmdClient {}: Send command: {}", m_id.c_str(), cmd.c_str());
            size_t len = asio::write(socket, asio::buffer(cmd.c_str(), cmd.length()));
            debug("CmdClient {}: {} bytes written.", m_id.c_str(), len);
            m_cmd = NoOp;  // Avoid resend in next frame;
        }
    }

如果我删除服务器端的while循环,它看起来像

class CmdHandler(socketserver.StreamRequestHandler):
    # timeout = 5
    def handle(self):
        data = self.request.recv(1024)
        s = data.decode('utf-8')
        if s == 'Hello':
            self.request.send('ACK\x00'.encode())
        if s == 'Stop':            
            with threading.Lock():
                _dostuff = False
        if s == 'Start':
            with threading.Lock():
                _dostuff = True
        return

然后

  • 客户端收到服务器的ACK
  • 但是Client发送的后续消息不会被Server接收到。

【问题讨论】:

  • 我通过专门使用非阻塞 I/O 来避免这些问题;使用非阻塞 I/O,您永远不会因为阻塞调用而失去对线程的控制。 (这确实意味着您必须实现状态机代码来处理部分读取和写入,但这都是可行的)
  • 这是您代码中的错误。您应该在两端使用读取超时。但是,如果每次您的客户端应该收到流结束指示时服务器真的关闭连接。
  • @JeremyFriesner 您的非阻塞 I/O 技术到底是什么?是asio的异步API还是原始套接字select
  • @user207421 如果我使用超时,是否意味着套接字将在超时后关闭,然后客户端必须在每次通信时重新连接到服务器套接字?
  • 不,如果发生超时,它不会关闭,它只会抛出任何涉及的异常,或者返回任何涉及的错误代码。读取超时对连接来说不是致命的。

标签: python c++ sockets networking


【解决方案1】:

所以我终于解决了这个问题。非常感谢@bigdataolddriver 的离线帮助。我学到了很多关于 ncat 调试的知识。

我基本上

  • 在服务器端:放弃了使用 Python 的socketserver 模块的想法。一方面,我发现it's synchronous only
  • 在客户端:使用asio::ip::tcp::socket::read_some / asio::ip::tcp::socket::write_some 而不是asio::read / asio::write

这是仅基于 socket 模块的新服务器代码。

import socket
import sys
import threading

_dostuff = True

def run_cmd_server():
    global _dostuff
    # Create a TCP/IP socket
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    # Bind the socket to the port
    server_address = ('localhost', 1234)
    print('CmdServer: starting up on {} port {}'.format(*server_address))
    sock.bind(server_address)

    # Listen for incoming connections
    sock.listen(1)

    while True:
        # Wait for a connection
        print('CmdServer: waiting for a connection')
        connection, client_address = sock.accept()
        try:
            print('CmdServer: connection from client:', client_address)

            # Receive the data in small chunks and retransmit it
            while True:
                data = connection.recv(1024)
                print('received {!r}'.format(data))
                if not data:
                    print('no data from', client_address)
                    break
                cmd = data.decode('utf-8').strip('\x00')
                if cmd == 'Hello':
                    print('Cmd: {}'.format(cmd))
                    connection.sendall('ACK\x00'.encode('utf-8'))
                elif cmd == 'Stop':
                    print('Cmd: {}'.format(cmd))                    
                    _dostuff = False
                    print('_dostuff : {}'.format(_dostuff ))
                elif cmd == 'Start':
                    _dostuff = True
                    print('_dostuff : {}'.format(_dostuff ))
                else:
                    print('Misc: {}'.format(cmd))
                connection.sendall('ack\x00'.encode('utf-8'))
        except:
            continue;
        finally:
            # Clean up the connection
            connection.close()


def main():
    t1 = threading.Thread(target=run_cmd_server, name='t_cmd', daemon=True)
    t1.start()
    t1.join()


if __name__ == '__main__':
    main()

这是新的客户端代码:

virtual bool Connect() override {
        bool isInitialized = false;
        try {
            asio::io_context io_context;
            asio::ip::tcp::resolver resolver(io_context);
            asio::ip::tcp::resolver::query query("127.0.0.1", "1234");
            asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
            asio::ip::tcp::socket socket(io_context);
            asio::connect(socket, endpoint_iterator);
            while (true) {
                std::array<char, 1024> readBuf{'\0'};
                asio::error_code error;
                // Handshaking
                // - on connection, say hello to cmd-server; wait for ACK
                if ( ! isInitialized ) {
                    debug("CmdClient {}: handshaking ...", m_id.c_str());
                    std::string handshake("Hello");
                    size_t len = socket.write_some(asio::buffer(handshake.c_str(), handshake.length()), error);
                    if (error == asio::error::eof) {
                        asio::connect(socket, endpoint_iterator);
                        continue; // Connection closed cleanly by peer; keep trying.
                    }
                    else if (error)
                        throw asio::system_error(error); // Some other error.
                    len = socket.read_some(asio::buffer(readBuf), error);
                    if (len <= 0) {
                        debug("CmdClient {}: No response", m_id.c_str());
                    }
                    std::string received = std::string(readBuf.data());
                    if (received == std::string("ACK")) {
                        debug("CmdClient {}: handshaking ... SUCCESS!", m_id.c_str());
                        isInitialized = true;
                        Notify("ACK");
                    }
                    else {
                        debug("CmdClient {}: Received: {}", m_id.c_str(), received.c_str());
                    }
                    continue;
                }
                SendCommand(socket);

            }
        }
        catch (std::exception& e) {
            std::cerr << e.what() << std::endl;
            isInitialized = false;
        }
        return true;
    }


    void SendCommand(asio::ip::tcp::socket& socket) {
        std::string cmd("");
        switch (m_cmd) {
        case NoOp:
            break;
        case Hello:
            cmd = "Hello";
            break;
        case Stop:
            cmd = "Stop";
            break;
        case Start:
            cmd = "Start";
            break;
        default:
            break;
        }
        if (cmd.size() > 0) {
            size_t len = socket.write_some(asio::buffer(cmd.c_str(), cmd.length()));
            m_cmd = NoOp;  // Avoid resend in next frame;
        }
    }

我还没有使用过 ASIO 的异步功能(非常害怕在这次调试会话之后立即这样做)。但是现在至少这段代码可以正常工作:服务器可以正常接收来自客户端的命令。

附带说明,由于只有一个线程写入全局变量_dostuff,因此我删除了线程锁定。

如果有人知道我最初的实现到底哪里出了问题,我仍然会很感激。

【讨论】:

    猜你喜欢
    • 2020-12-20
    • 1970-01-01
    • 2021-12-26
    • 2017-10-29
    • 1970-01-01
    • 2015-10-17
    • 1970-01-01
    • 1970-01-01
    • 2014-02-08
    相关资源
    最近更新 更多