【问题标题】:Recv() returning SOCKET_ERROR when I connect a client to the server instead of blocking and waiting for message当我将客户端连接到服务器而不是阻塞并等待消息时,Recv() 返回 SOCKET_ERROR
【发布时间】:2019-06-07 10:49:42
【问题描述】:

我对 C++ 中的网络编程和多线程比较陌生。目前我的 recv() 调用返回一个未知错误。目前我不太确定错误来自哪里,希望能得到一些帮助。

我用putty本地连接服务器

class Threaded_TCPListener{

    int Threaded_TCPListener::Init()
    {
        // Initializing WinSock
        WSADATA wsData;
        WORD ver = MAKEWORD(2,2);

        int winSock = WSAStartup(ver, &wsData);
        if(winSock != 0)
            return winSock;

        // Creating listening socket
        this->socket = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);

        if(this->socket == INVALID_SOCKET)
            return WSAGetLastError();

        // Fill sockaddr with ip addr and port
        sockaddr_in hint;
        hint.sin_family = AF_INET;
        hint.sin_port = htons(this->port);
        inet_pton(AF_INET, this->ipAddress, &hint.sin_addr);

        // Bind hint to socket
        if(bind(this->socket, (sockaddr*)&hint, sizeof(hint)) == SOCKET_ERROR)
            return WSAGetLastError();

        // Start listening on socket
        if(listen(this->socket, SOMAXCONN) == SOCKET_ERROR)
            return WSAGetLastError();

        // Accept first client
        this->createAcceptThread();

        return 0;
    }

    int Threaded_TCPListener::Run()
    {
        bool isRunning = true;

        // Read from all clients
        std::vector<std::thread> threads;
        threads.reserve(this->clients.size());

        // Recv from client sockets
        for (int i=0; i < this->clients.size(); ++i)
        {
            threads.emplace_back(std::thread(&Threaded_TCPListener::receiveFromSocket, this, socket));
        }

        // Wait for all threads to finish
        for(std::thread& t : threads)
        {
            t.detach();
        }

        return 0;
    }

    void Threaded_TCPListener::onMessageReceived(int clientSocket, const char* msg, int length)
    {
        Threaded_TCPListener::broadcastToClients(clientSocket, msg, length);

        std::thread t(&Threaded_TCPListener::receiveFromSocket, this, clientSocket);

        t.detach();

        return;
    }

    void Threaded_TCPListener::sendMessageToClient(int clientSocket, const char * msg, int length)
    {
        send(clientSocket, msg, length, 0);

        return;
    }

    void Threaded_TCPListener::broadcastToClients(int senderSocket, const char * msg, int length)
    {
        std::vector<std::thread> threads;
        threads.reserve(clients.size());

        // Iterate over all clients
        for (int sendSock : this->clients)
        {
            if(sendSock != senderSocket)
                threads.emplace_back(std::thread(&Threaded_TCPListener::sendMessageToClient, this,sendSock, msg, length));
        }

        // Wait for all threads to finish
        for(std::thread& t : threads)
            t.join();

        return;
    }

    void Threaded_TCPListener::createAcceptThread()
    {
        // Start accepting clients on a new thread
        this->listeningThread = std::thread(&Threaded_TCPListener::acceptClient, this);

        this->listeningThread.detach();

        return;
    }

    void Threaded_TCPListener::acceptClient()
    {
        int client = accept(this->socket, nullptr, nullptr);

        // Error
        if(client == INVALID_SOCKET)
        {
            std::printf("Accept Err: %d\n", WSAGetLastError());
        }
        // Add client to clients queue
        else
        {
            // Add client to queue
            this->clients.emplace(client);

            // Client Connect Confirmation
            onClientConnected(client); // Prints msg on server

            // Create another thread to accept more clients
            this->createAcceptThread();
        }

        return;
    }

    void Threaded_TCPListener::receiveFromSocket(int receivingSocket)
    {
        // Byte storage
        char buff[MAX_BUFF_SIZE];

        // Clear buff
        memset(buff, 0, sizeof(buff));

        // Receive msg
        int bytesRecvd = recv(receivingSocket, buff, MAX_BUFF_SIZE, 0);
        if(bytesRecvd <= 0)
        {
            char err_buff[1024];
            strerror_s(err_buff, bytesRecvd);

            std::cerr << err_buff;
            // Close client
            this->clients.erase(receivingSocket);
            closesocket(receivingSocket);

            onClientDisconnected(receivingSocket); // Prints msg on server
        }
        else
        {
            onMessageReceived(receivingSocket, buff, bytesRecvd);
        }
    }
}

我正在尝试创建一个多线程 TCP“服务器”,该服务器将通过持续运行一个接受线程(侦听新连接)来处理传入的客户端,并为每个连接到服务器的客户端等待一个接收块的线程。

【问题讨论】:

  • 显示的代码不完整。没有提供socketclients 的定义,但它们极不可能是线程安全对象——它们似乎是普通的int 和某种std::vector,因为它们是被多个线程访问这是未定义的行为,并且很可能发生随机崩溃。这已经是失败了。此外,尚不清楚执行如何从原始线程到达receiveFromSocket()。缺少代码的关键部分。请阅读 stackoverflow.com 对 minimal reproducible exampleedit 相应问题的要求。
  • 当 Winsock 函数失败时,使用WSAGetLastError() 找出原因。没有“未知”错误。您的 Run() 正在将收听的 socket 传递给 receiveFromSocket(),而它应该传递 clients[i]。此外,您不应为每次调用accept() 创建一个新线程。在服务器的生命周期内创建一个循环调用accept() 的线程。
  • 谢谢,我试图尽可能地包含一些,但我删掉了太多。

标签: c++ multithreading tcp network-programming


【解决方案1】:
  1. 您的Init 看起来不错:

    • 创建套接字,绑定它,监听它,启动接受线程
  2. 在您的接受线程中,acceptClient 看起来还不错:

    • 打印一些信息
    • 将客户端套接字添加到clients队列
    • 创建一个新的接受线程
  3. 您的Run 毫无意义:

    • clients 中的每个元素创建一个线程以接收来自监听socket

看起来您正在为每个套接字操作生成一个新线程。这是一个相当浪费的设计。一旦线程完成,它就可以返回做其他事情。

因此在acceptClient 中创建一个新的接受线程是一种浪费,您可以直接循环回到开头到::accept 下一个客户端。像这样:

acceptClient() {
  while (alive) {
    int client = accept(socket, ...);
    createClientHandler(client);
  }
}

似乎缺少的是生成一个新的客户端线程来服务客户端套接字。您目前在Run 中执行此操作,但那是在实际接受任何客户端之前。而你这样做是为了错误套接字!相反,您应该在acceptClient 中生成receiveFromSocket 线程, 将其传递给client 套接字。所以这是一个错误。

在您的receiveFromSocket 中,您也无需再次为receiveFromSocket 创建另一个线程——只需循环回到开头即可。

这种每次操作线程设计的最大问题是,您会在每条传入消息上生成发送者线程。这意味着您实际上可以有多个发送者线程尝试在同一个 TCP 套接字上::send。这不是很安全。

对 WSASend 的调用顺序也是缓冲区传输到传输层的顺序。 WSASend 不应该在同一个面向流的套接字上同时从不同的线程调用,因为一些 Winsock 提供程序可能会将一个大的发送请求拆分为多个传输,这可能会导致来自同一个面向流的多个并发发送请求的意外数据交错插座。

https://docs.microsoft.com/en-us/windows/desktop/api/winsock2/nf-winsock2-wsasend

同样,我建议您不要在broadcastToClients 中生成线程,而是在acceptClient 中为每个客户端套接字生成一个持久发送者线程(连同一些createClientHandler 中的receiveFromSocket 线程)。

要与发送者线程通信,您应该使用线程安全的阻塞队列。每个发送者线程如下所示:

while (alive) {
  msg = queue.next_message();
  send(client_socket, msg);
}

然后在收到消息时,您只需执行以下操作:

for (client : clients) {
  client.queue.put_message(msg);
}

总而言之,要处理每个客户端,您需要这样的结构:

struct Client {
  int client_socket;
  BlockingQueue queue;

  // optionally if you want to keep track of your threads
  // to be able to safely clean up
  std::thread recv_thread, send_thread;
};

安全清理是另一回事。


最后,在您的代码中对这条评论进行评论:

        // Wait for all threads to finish
        for(std::thread& t : threads)
        {
            t.detach();
        }

这与std::thread::detach 所做的几乎相反: https://en.cppreference.com/w/cpp/thread/thread/detach 它允许您销毁线程对象,而无需等待线程完成执行。

【讨论】:

  • 感谢您的分解,我对这些主题还是新手,我很感激任何和所有的知识!
【解决方案2】:

代码中有一个关于如何实现 TCP 服务器的误解:

您似乎假设您可以拥有一个可以处理所有通信的服务器套接字文件描述符。不是这种情况。您必须有一个专用的套接字文件描述符,该描述符仅用于侦听和接受传入连接,然后每个现有连接都有一个附加文件描述符。

在您的代码中,我看到您始终使用侦听套接字调用 receiveFromSocket()。这是错误的。在循环中为所有客户端调用 receiveFromSocket() 也是错误的。

您需要做的是: - 有一个在循环中调用accept() 的专用线程。从多个线程调用 accept() 没有性能优势。 - 一个 accept() 返回一个新连接,您会生成一个新线程,该线程在循环中调用 recv()。然后,这将按照您在问题中的预期阻止并等待新数据。

您还需要放弃从新线程调用单个函数的习惯。这不是多线程编程。一个线程通常包含一个循环。其他一切通常都是设计缺陷。

另请注意,多线程编程在 2019 年仍然是火箭科学,尤其是在 C++ 中。如果您不是绝对的专家,您将无法做到这一点。另请注意,多线程编程的绝对专家将尽可能避免多线程编程。许多看似并发的 I/O 绑定任务可以更好地由基于单线程事件的系统处理。

【讨论】:

  • 是的,我听说它非常困难!不过,我发现多线程令人兴奋,并希望在这方面做得更好。我会记住单线程是首选!另外我会研究基于事件的系统,所以谢谢你的提示。
猜你喜欢
  • 2016-08-23
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-07-19
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-03-12
相关资源
最近更新 更多