【问题标题】:c++ multithreaded tcp server using futures and asyncc++ 使用期货和异步的多线程 tcp 服务器
【发布时间】:2021-05-09 08:35:19
【问题描述】:

我最近开始学习 c++,并且正在学习一些教程。我正在研究套接字,我决定作为一个附带项目来创建一个小型多线程服务器,如下所示。

我试图在达到CLIENTS_MAX_NUM 后关闭服务器侦听套接字,然后在套接字断开连接后重新打开它,但是这给了我一个错误 10022 (WSAEINVAL),我不太确定我做错了什么.

如果您想重现错误,只需使用 telnet 连接并关闭客户端连接(ctrl+] ,退出)。

任何帮助将不胜感激。

#include <iostream>
#include <vector>
#include <string>
#include <future>
#include <chrono>
#include <WS2tcpip.h>

#pragma comment(lib, "ws2_32.lib")

static constexpr const unsigned int PORT = 5000;
static constexpr const unsigned int CLIENTS_MAX_NUM = 1;
static constexpr const unsigned int CLIENTS_QUEUE_NUM = 10;
SOCKET server_sock;
std::vector<std::future<void>> futures;
std::mutex mtx;

void initialize_winsock() {

    WSAData wsData;
    WORD ver = MAKEWORD(2, 2);

    int wsResult = WSAStartup(ver, &wsData);

    if (wsResult != 0) {

        std::cerr << WSAGetLastError() << std::endl;

        WSACleanup();
        exit(EXIT_FAILURE);

    }

}

void bind_server_socket() {
    
    int keep_alive = 1;
    int re_use = 1;

    if (setsockopt(server_sock, SOL_SOCKET, SO_KEEPALIVE, (const char*)&keep_alive, sizeof(keep_alive)) == SOCKET_ERROR) {
        
        closesocket(server_sock);
        WSACleanup();
        exit(EXIT_FAILURE);

    }

    if (setsockopt(server_sock, SOL_SOCKET, SO_REUSEADDR, (const char*)&re_use, sizeof(re_use)) == SOCKET_ERROR) {

        closesocket(server_sock);
        WSACleanup();
        exit(EXIT_FAILURE);

    }

    sockaddr_in server;

    server.sin_family = AF_INET;
    server.sin_port = htons(PORT);
    server.sin_addr.S_un.S_addr = INADDR_ANY;

    memset(&server.sin_zero, 0, 8);

    if (bind(server_sock, (sockaddr*)&server, sizeof(sockaddr)) == SOCKET_ERROR) {

        std::cerr << WSAGetLastError() << std::endl;

        closesocket(server_sock);
        WSACleanup();
        exit(EXIT_FAILURE);

    }



}

void open_server_socket(bool &listening) {
    
    server_sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);

    if (server_sock == INVALID_SOCKET) {

        std::cerr << WSAGetLastError() << std::endl;

        listening = false;

        WSACleanup();
        exit(EXIT_FAILURE);

    }

    listening = true;

}

void close_server_socket(bool &listening) {
    
    closesocket(server_sock);

    listening = false;

}

void handle_client(SOCKET client_sock, sockaddr_in client) {
    
    char buf[4096];
    char host[NI_MAXHOST];
    char service[NI_MAXHOST];

    memset(host, 0, NI_MAXHOST);
    memset(service, 0, NI_MAXHOST);

    //std::cout << std::this_thread::get_id() << std::endl;

    if (getnameinfo((sockaddr*)&client, sizeof(client), host, NI_MAXHOST, service, NI_MAXSERV, 0) == 0) {

        std::cout << host << " connected on port " << service << std::endl;

    }
    else {

        inet_ntop(AF_INET, &client.sin_addr, host, NI_MAXHOST);

        std::cout << host << " connected on port " << ntohs(client.sin_port) << std::endl;

    }

    while (true) {

        memset(&buf, 0, 4096);

        const int bytes_received = recv(client_sock, buf, 4096, 0);

        if (bytes_received == SOCKET_ERROR) {

            std::cerr << WSAGetLastError() << std::endl;

            WSACleanup();

        }
        else if (bytes_received == 0) {

            std::cout << "client disconnected" << std::endl;
            break;

        }
        else {

            send(client_sock, buf, bytes_received + 1, 0);

        }

    }

}

int main(int argc, const char* argv[]) {
    
    bool listening = false;

    initialize_winsock();
    open_server_socket(listening);
    bind_server_socket();

    // -----------------------------------------------------------

    if (listen(server_sock, CLIENTS_QUEUE_NUM) == SOCKET_ERROR) {

        std::cerr << WSAGetLastError() << std::endl;

        closesocket(server_sock);
        WSACleanup();
        exit(EXIT_FAILURE);

    }
    else {

        std::cout << "listening for incoming connections on port " << PORT << std::endl;

        while (true) {

            unsigned int removed = 0;

            for (int i = 0; i < futures.size(); i++) {

                auto status = futures.at(i).wait_for(std::chrono::milliseconds(0));

                if (status == std::future_status::ready) {

                    futures.erase(futures.begin() + i);

                    removed++;

                }

            }

            if (removed > 1) {
                std::cout << removed << " clients removed" << std::endl;
            }
            else if (removed) {
                std::cout << removed << " client removed" << std::endl;
            }

            if (futures.size() < CLIENTS_MAX_NUM && !listening) {

                std::cout << "re-opening server socket" << std::endl;

                open_server_socket(listening);

                // BOOM <--- 10022 (WSAEINVAL) - https://docs.microsoft.com/en-us/windows/win32/winsock/windows-sockets-error-codes-2

            }

            if (listening) {

                sockaddr_in client;

                memset(&client.sin_zero, 0, 8);

                int client_size = sizeof(client);

                SOCKET client_sock = accept(server_sock, (sockaddr*)&client, &client_size);

                if (client_sock == INVALID_SOCKET) {

                    std::cerr << WSAGetLastError() << std::endl;

                    closesocket(server_sock);
                    WSACleanup();
                    exit(EXIT_FAILURE);

                }
                else {
                    
                    futures.emplace_back(std::async(std::launch::async, &handle_client, client_sock, client));

                    if (futures.size() >= CLIENTS_MAX_NUM && listening) {

                        std::cout << "closing server socket" << std::endl;

                        close_server_socket(listening);

                    }

                    std::cout << futures.size() << " clients connected" << std::endl;

                }

            }

        }

    }

    // ----------------------------------------------------------

    std::cout << "bye!" << std::endl;

    WSACleanup();
    exit(EXIT_SUCCESS);

}

【问题讨论】:

  • 具体是什么给了你这个错误?关闭?重开?或者下次您使用重新打开但未重新绑定的服务器套接字调用 accept 时?
  • 当我尝试重新打开套接字行号 211(注释所在的位置)时。我也尝试在重新打开后调用bind_server_socket,但出现同样的错误。所以我猜它是别的东西?

标签: c++ multithreading sockets winsock2


【解决方案1】:

经过一番挖掘,我发现了几个愚蠢的错误,下面是工作代码:

更新

还修复了矢量循环

#include <iostream>
#include <vector>
#include <string>
#include <future>
#include <chrono>
#include <WS2tcpip.h>

#pragma comment(lib, "ws2_32.lib")

static constexpr const unsigned int PORT = 5000;
static constexpr const unsigned int CLIENTS_MAX_NUM = 5;
static constexpr const unsigned int CLIENTS_QUEUE_NUM = 0;

void handle_client(SOCKET client_sock, sockaddr_in client) {

    char buf[4096];
    char host[NI_MAXHOST];
    char service[NI_MAXHOST];

    memset(host, 0, NI_MAXHOST);
    memset(service, 0, NI_MAXHOST);

    //std::cout << std::this_thread::get_id() << std::endl;

    if (getnameinfo((sockaddr*)&client, sizeof(client), host, NI_MAXHOST, service, NI_MAXSERV, 0) == 0) {

        std::cout << host << " connected on port " << service << std::endl;

    }
    else {

        inet_ntop(AF_INET, &client.sin_addr, host, NI_MAXHOST);

        std::cout << host << " connected on port " << ntohs(client.sin_port) << std::endl;

    }

    while (true) {

        memset(&buf, 0, 4096);

        const int bytes_received = recv(client_sock, buf, 4096, 0);

        if (bytes_received == SOCKET_ERROR) {

            std::cerr << WSAGetLastError() << std::endl;

            WSACleanup();

        }
        else if (bytes_received == 0) {

            std::cout << "client disconnected" << std::endl;
            break;

        }
        else {

            send(client_sock, buf, bytes_received + 1, 0);

        }

    }

}

void initialize_winsock() {

    WSAData wsData;
    WORD ver = MAKEWORD(2, 2);

    int wsResult = WSAStartup(ver, &wsData);

    if (wsResult != 0) {

        std::cerr << WSAGetLastError() << std::endl;

        WSACleanup();
        exit(EXIT_FAILURE);

    }

}

void bind_server_socket(SOCKET &server_sock) {
    
    int keep_alive = 1;
    int re_use = 1;

    if (setsockopt(server_sock, SOL_SOCKET, SO_KEEPALIVE, (const char*)&keep_alive, sizeof(keep_alive)) == SOCKET_ERROR) {
        
        closesocket(server_sock);
        WSACleanup();
        exit(EXIT_FAILURE);

    }

    if (setsockopt(server_sock, SOL_SOCKET, SO_REUSEADDR, (const char*)&re_use, sizeof(re_use)) == SOCKET_ERROR) {

        closesocket(server_sock);
        WSACleanup();
        exit(EXIT_FAILURE);

    }

    sockaddr_in server;

    server.sin_family = AF_INET;
    server.sin_port = htons(PORT);
    server.sin_addr.S_un.S_addr = INADDR_ANY;

    memset(&server.sin_zero, 0, 8);

    if (bind(server_sock, (sockaddr*)&server, sizeof(sockaddr)) == SOCKET_ERROR) {

        std::cerr << WSAGetLastError() << std::endl;

        closesocket(server_sock);
        WSACleanup();
        exit(EXIT_FAILURE);

    }



}

void open_server_socket(SOCKET &server_sock, bool &accepting) {
    
    server_sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);

    if (server_sock == INVALID_SOCKET) {

        std::cerr << WSAGetLastError() << std::endl;

        accepting = false;

        WSACleanup();
        exit(EXIT_FAILURE);

    }

    accepting = true;

}

void close_server_socket(SOCKET &server_sock, bool &accepting) {
    
    closesocket(server_sock);

    accepting = false;

}

void clear_futures(std::vector<std::future<void>> &futures) {

    std::vector<std::future<void>>::iterator it = futures.begin();

    while (it != futures.end()) {

        auto status = (*it).wait_for(std::chrono::milliseconds(0));

        if (status == std::future_status::ready) {

            it = futures.erase(it);

        }
        else {
            ++it;
        }
    }

}

void wait_for_connections(std::vector<std::future<void>> &futures, SOCKET &server_sock, bool& accepting) {

    if (listen(server_sock, CLIENTS_QUEUE_NUM) == SOCKET_ERROR) {

        std::cerr << WSAGetLastError() << std::endl;

        closesocket(server_sock);
        WSACleanup();
        exit(EXIT_FAILURE);

    }
    else {

        std::cout << "accepting for incoming connections on port " << PORT << std::endl;

        while (true) {

            if (futures.size() < CLIENTS_MAX_NUM && !accepting) {

                std::cout << "re-opening server socket" << std::endl;

                open_server_socket(server_sock, accepting);
                bind_server_socket(server_sock);
                wait_for_connections(futures, server_sock, accepting);

                break;

            }

            if (accepting) {

                sockaddr_in client;

                memset(&client.sin_zero, 0, 8);

                int client_size = sizeof(client);

                SOCKET client_sock = accept(server_sock, (sockaddr*)&client, &client_size);

                if (client_sock == INVALID_SOCKET) {

                    std::cerr << WSAGetLastError() << std::endl;

                    closesocket(server_sock);
                    WSACleanup();
                    exit(EXIT_FAILURE);

                }

                clear_futures(futures);

                futures.emplace_back(std::async(std::launch::async, &handle_client, client_sock, client));

                if (futures.size() >= CLIENTS_MAX_NUM && accepting) {

                    std::cout << "closing server socket" << std::endl;

                    close_server_socket(server_sock, accepting);

                }

                std::cout << futures.size() << " clients connected" << std::endl;

            }

        }

    }

}

int main(int argc, const char* argv[]) {
    
    std::vector<std::future<void>> futures;
    bool accepting = false;
    SOCKET server_sock;

    initialize_winsock();
    open_server_socket(server_sock, accepting);
    bind_server_socket(server_sock);
    wait_for_connections(futures, server_sock, accepting);

    std::cout << "bye!" << std::endl;

    WSACleanup();
    exit(EXIT_SUCCESS);

}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-06-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多