【问题标题】:Asynchronous I/O on C++ SocketsC++ 套接字上的异步 I/O
【发布时间】:2021-01-15 18:48:36
【问题描述】:

我正在编写一个多线程套接字应用程序(用于 Windows),但是在连接多个客户端时出现问题。我可以从服务器向客户端发送消息,但是,我只能从一个客户端向服务器发送消息。其他客户端无法向服务器发送消息。我搜索了一些东西,发现重叠/异步 I/O 是要走的路。这只是一个问题,我不知道如何实现,所以我基本上是在问我将如何去做,或者这种方法是否错误。

RecieveFromClients() 函数是我想要异步的。

提前致谢。

这是我的代码:

main.cpp

#include "server.h"

int main()
{
    Server server;

    server.StartServer();

    return 0;
}

服务器.h

#include <WinSock2.h>
#include <WS2tcpip.h>
#include <iphlpapi.h>
#include <stdio.h>

#include <string>
#include <iostream>
#include <vector>
#include <thread>

#pragma comment(lib, "Ws2_32.lib")

#define DEFAULT_PORT 4566
#define BACKLOG 10

class Server
{
public:
    Server();
    ~Server();
    int StartServer();
private:
    int Socket();
    int Bind();
    int Listen();
    void AcceptConnections();
    int StopServer();
    int CloseClientSocket(int client_num);

    void GetClients(int server_socket, std::vector<int>* clients,
        std::vector<sockaddr_in> *client_addrs);

    void SendToClients();
    void RecieveFromClients(int id);
private:
    sockaddr_in server_addr;
    int connected_clients, counted_clients;
    int server_socket;
    int result;
    int msgSize;
    std::vector<int> clients;
    std::vector<sockaddr_in> client_addrs;
private:
    std::thread get_clients;
    std::thread send_messages;
    std::thread recieve_messages;
};

服务器.cpp

#include "server.h"

Server::Server()
    :
    connected_clients(0),
    counted_clients(0),
    server_socket(0),
    result(0)
{
    ZeroMemory(&server_addr, 0);

    WSAData wsaData;
    if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0)
    {
        printf("wsastartip falied\n");
    }
}

Server::~Server()
{
    WSACleanup();
}

int Server::StartServer()
{
    if (Socket() != 0)
    {
        return 1;
    }

    if (Bind() != 0)
    {
        return 1;
    }

    if (Listen() != 0)
    {
        return 1;
    }

    AcceptConnections();

    return 0;
}

int Server::Socket()
{
    server_socket = socket(AF_INET, SOCK_STREAM, 0);
    if (server_socket == INVALID_SOCKET)
    {
        std::cout << "Failed to create socket" << std::endl;
        return 1;
    }

    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(DEFAULT_PORT);
    server_addr.sin_addr.S_un.S_addr = INADDR_ANY;

    return 0;
}

int Server::Bind()
{
    result = bind(server_socket, (sockaddr*)&server_addr,
        sizeof(server_addr));
    if (result == SOCKET_ERROR)
    {
        std::cout << "Failed to bind socket" << std::endl;
        return 1;
    }

    return 0;
}

int Server::Listen()
{
    result = listen(server_socket, BACKLOG);
    if (result == SOCKET_ERROR)
    {
        std::cout << "Listening failed" << std::endl;
        return 1;
    }

    return 0;
}

void Server::AcceptConnections()
{
    get_clients = std::thread(&Server:: GetClients, this, server_socket,
        &clients, &client_addrs);
    send_messages = std::thread(&Server::SendToClients, this);
    recieve_messages = std::thread(&Server::RecieveFromClients, this, counted_clients);
    get_clients.join();
    send_messages.join();
    recieve_messages.join();
}

int Server::StopServer()
{
    std::terminate();

    for (int client : clients)
    {
        result = closesocket(client);
        if (result == SOCKET_ERROR)
        {
            std::cout << "Failed to close client socket" << std::endl;
            return 1;
        }
    }

    return 0;
}

int Server::CloseClientSocket(int client_num)
{
    result = closesocket(clients[client_num]);
    if (result == SOCKET_ERROR)
    {
        std::cout << "Failed to close client socket" << std::endl;
        return 1;
    }
    return 0;
}

void Server::GetClients(int server_socket, std::vector<int>* clients,
    std::vector<sockaddr_in>* client_addrs)
{
    
    while(true)
    {
        sockaddr_in client_addr = { 0 };
        socklen_t client_addrstrlen = sizeof(client_addr);
        int client;

        client = accept(server_socket, (sockaddr*)&client_addr,
            &client_addrstrlen);

        clients->push_back(client);
        client_addrs->push_back(client_addr);
        ++connected_clients;

        char ip[INET_ADDRSTRLEN] = "";
        char port[100] = "";
        inet_ntop(AF_INET, &client_addr.sin_addr, ip, sizeof(ip));
        std::cout << "Client connected from " << ip << ":" << 
            client_addr.sin_port << std::endl;
    }
}

void Server::SendToClients()
{
    std::string msg;
    do
    {
        msg.clear();
        getline(std::cin, msg);
        if (msg.size() > 255)
        {
            std::cout << "Message must be less than 256 bytes"
                << std::endl;
            continue;
        }

        for (int client : clients)
        {
            int size;
            size = send(client, msg.data(), msg.size(), 0);
            if (size == SOCKET_ERROR)
            {
                std::cout << "Failed to send message to client"
                    << std::endl;
            }
        }
    } while (msg != "exit");

    if (StopServer() != 0)
    {
        std::cout << "Failed to close client sockets" << std::endl;
    }
}

void Server::RecieveFromClients(int id)
{
    std::vector<char> msgBuffer(256);
    do
    {
        
        if (connected_clients > 0)
        {
            msgBuffer.clear();
            msgBuffer.resize(256);
            char ip[INET_ADDRSTRLEN];
            inet_ntop(AF_INET, &client_addrs[id].sin_addr, ip, sizeof(ip));

            if (msgSize = recv(clients[id], msgBuffer.data(),
                msgBuffer.size(), 0) > 0)
            {
                std::cout << ip << ": ";
                for (char c : msgBuffer)
                {
                    if (c != 0)
                    {
                        std::cout << c;
                    }
                }
                std::cout << std::endl;
            }
            else
            {
                if (msgSize == SOCKET_ERROR)
                {
                    std::cout << "Failed to recieve data" << std::endl;
                    break;
                }
                else if (clients[id] > 0)
                {
                    std::cout << "Client " << ip << " has disconnected" << std::endl;
                    CloseClientSocket(0);
                    break;
                }
            }
        }
        else
        {
            continue;
        }
    } while (true);
}

【问题讨论】:

标签: c++ sockets winapi recv


【解决方案1】:

使用重叠 I/O 将对您的代码设计产生相当大的改变。但幸运的是,有一个更简单的解决方案。在您的 RecieveFromClients() 方法中,您可以使用select() 来确定哪些客户端套接字实际上有待读取的数据在您尝试从中读取之前。您正在使用阻塞套接字,因此对 recv() 的调用将阻塞调用线程,直到接收到数据,因此您不希望在实际准备好读取某些内容之前执行阻塞读取。

此外,由于您没有为每个接受的客户端创建新线程,RecieveFromClients()id 参数和 CloseClientSocket()client_num 参数使用不正确,应该完全删除。接收函数应该在连接的客户端列表上运行一个循环。并且关闭函数应该使用特定的套接字句柄来关闭。

话虽如此,您的代码的另一个主要设计问题是您在多个线程之间共享变量和容器,但您没有同步对其中任何一个的访问。从长远来看,这会给你带来很大的问题。

【讨论】:

  • 谢谢你,我在过去的 5 个小时里一直坚持这个大声笑。是的,我一定会研究共享变量问题
  • 是的,id 部分来自旧版本,我现在用 select() 工作删除了它
猜你喜欢
  • 1970-01-01
  • 2011-09-16
  • 2012-07-29
  • 2010-09-12
  • 1970-01-01
  • 2011-11-25
  • 2013-06-19
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多