C++百万并发网络通信引擎架构与实现(服务端、客户端、跨平台),重新复习下Windows以及Linux、MacOS下的C++网络编程。另外因为最近自己使用boost写了一个TCP服务器压力测试工具,模拟多个客户端设备连接指定的服务器,并定时向服务器推送数据,以测试服务器的并发连接数等,感觉看这个视频收货还蛮大的。
下面是Windows下使用Select模型实现的一个简易TCP服务端和客户端,客户端添加了一个命令输入线程,代码如下:
一、服务端程序代码如下:
// Server.cpp
#include <stdio.h>
#include <iostream>
#include <vector>
#include <algorithm>
#define WIN32_LEAN_AND_MEAN
#include <Windows.h>
#include <WinSock2.h>
#pragma comment(lib, "ws2_32.lib")
using namespace std;
enum CMDTYPE
{
CMD_LOGIN,
CMD_LOGIN_RESULT,
CMD_LOGOUT,
CMD_LOGOUT_RESULT,
CMD_NEW_USER_JOIN,
CMD_ERROR
};
struct DataHeader
{
int cmd;
int dataLength;
};
struct Login : public DataHeader
{
Login()
{
dataLength = sizeof(Login);
cmd = CMD_LOGIN;
}
char userName[32];
char passWord[32];
};
struct LoginResult : public DataHeader
{
LoginResult()
{
dataLength = sizeof(Login);
cmd = CMD_LOGIN_RESULT;
result = 0;
}
int result;
};
struct Logout : public DataHeader
{
Logout()
{
dataLength = sizeof(Logout);
cmd = CMD_LOGOUT;
}
char userName[32];
};
struct LogoutResult : public DataHeader
{
LogoutResult()
{
dataLength = sizeof(LogoutResult);
cmd = CMD_LOGOUT_RESULT;
result = 0;
}
int result;
};
struct NewUserJoin : public DataHeader
{
NewUserJoin()
{
dataLength = sizeof(NewUserJoin);
cmd = CMD_NEW_USER_JOIN;
sock = 0;
}
int sock;
};
std::vector<SOCKET> g_clientList;
int processor(SOCKET sock)
{
char szRecv[4096] = {};
int recvLen = recv(sock, szRecv, sizeof(DataHeader), 0);
DataHeader *pHeader = (DataHeader*)szRecv;
if (recvLen <= 0)
{
printf("客户端<Socket=%d>已退出,任务结束...", sock);
return -1;
}
switch (pHeader->cmd)
{
case CMD_LOGIN:
{
Login *login = (Login*)szRecv;
recv(sock, szRecv + sizeof(DataHeader), pHeader->dataLength - sizeof(DataHeader), 0);
printf("收到客户端<Socket=%d>请求:CMD_LOGIN, 数据长度:%d, userName:%s Password: %s\n",
sock, login->dataLength, login->userName, login->passWord);
LoginResult ret;
send(sock, (char*)&ret, sizeof(LoginResult), 0);
}
break;
case CMD_LOGOUT:
{
Logout *logout = (Logout*)szRecv;
recv(sock, szRecv + sizeof(DataHeader), pHeader->dataLength - sizeof(DataHeader), 0);
printf("收到客户端<Socket=%d>请求:CMD_LOGOUT, 数据长度:%d, userName:%s\n",
sock, logout->dataLength, logout->userName);
LogoutResult ret;
send(sock, (char*)&ret, sizeof(LogoutResult), 0);
}
break;
default:
{
DataHeader header = { 0, CMD_ERROR };
send(sock, (char*)&header, sizeof(header), 0);
break;
}
}
return 0;
}
int main(int argc, char *agrv[])
{
WORD wVersionRequested;
WSADATA wsaData;
int err;
wVersionRequested = MAKEWORD(2, 2);
err = WSAStartup(wVersionRequested, &wsaData);
if (err != 0) {
return 1;
}
if (LOBYTE(wsaData.wVersion) != 2 ||
HIBYTE(wsaData.wVersion) != 2) {
WSACleanup();
return 1;
}
SOCKET ListenSocket;
ListenSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (ListenSocket == INVALID_SOCKET) {
printf("Error at socket(): %ld\n", WSAGetLastError());
WSACleanup();
return 1;
}
sockaddr_in service;
service.sin_family = AF_INET;
service.sin_addr.s_addr = INADDR_ANY;
service.sin_port = htons(27015);
if (bind(ListenSocket,
(SOCKADDR*)&service,
sizeof(service)) == SOCKET_ERROR) {
printf("bind() failed.绑定网络端口失败\n");
closesocket(ListenSocket);
WSACleanup();
return 1;
}
else
{
printf("绑定网络端口成功...\n");
}
if (listen(ListenSocket, 5) == SOCKET_ERROR) {
printf("错误,监听网络端口失败...\n");
closesocket(ListenSocket);
WSACleanup();
return 1;
}
else
{
printf("监听网络端口成功...\n");
}
printf("等待客户端连接...\n");
while (true)
{
fd_set readfds;
fd_set writefds;
fd_set exceptfds;
FD_ZERO(&readfds);
FD_ZERO(&writefds);
FD_ZERO(&exceptfds);
FD_SET(ListenSocket, &readfds);
FD_SET(ListenSocket, &writefds);
FD_SET(ListenSocket, &exceptfds);
for (int n = (int)g_clientList.size() - 1; n >= 0 ; n--)
{
FD_SET(g_clientList[n], &readfds);
}
timeval timeout = { 1, 0 };
int ret = select(ListenSocket + 1, &readfds, &writefds, &exceptfds, &timeout);
if (ret < 0)
{
printf("select任务结束,called failed:%d!\n", WSAGetLastError());
break;
}
if (FD_ISSET(ListenSocket, &readfds))
{
SOCKADDR_IN clientAddr = {};
int nAddrLen = sizeof(SOCKADDR_IN);
SOCKET ClientSocket = INVALID_SOCKET;
ClientSocket = accept(ListenSocket, (SOCKADDR*)&clientAddr, &nAddrLen);
if (INVALID_SOCKET == ClientSocket) {
printf("accept() failed: %d,接收到无效客户端Socket\n", WSAGetLastError());
return 1;
}
else
{
for (int n = (int)g_clientList.size() - 1; n >= 0; n--)
{
NewUserJoin userJoin;
send(g_clientList[n], (const char*)&userJoin, sizeof(NewUserJoin), 0);
}
g_clientList.push_back(ClientSocket);
printf("新客户端<Sokcet=%d>加入,Ip地址:%s,端口号:%d\n", ClientSocket, inet_ntoa(clientAddr.sin_addr),
ntohs(clientAddr.sin_port));
}
}
for (int i = 0; i < (int)readfds.fd_count - 1; ++i)
{
if (-1 == processor(readfds.fd_array[i]))
{
auto iter = std::find(g_clientList.begin(), g_clientList.end(),
readfds.fd_array[i]);
if (iter != g_clientList.end())
{
g_clientList.erase(iter);
}
}
}
}
for (int n = (int)g_clientList.size() - 1; n >= 0; n--)
{
closesocket(g_clientList[n]);
}
closesocket(ListenSocket);
WSACleanup();
printf("服务端已退出,任务结束\n");
getchar();
return 0;
}
二、客户端程序代码如下:
// Client.cpp
#include <stdio.h>
#define WIN32_LEAN_AND_MEAN
#include <Windows.h>
#include <WinSock2.h>
#include <thread>
#pragma comment(lib, "ws2_32.lib")
enum CMDTYPE
{
CMD_LOGIN,
CMD_LOGIN_RESULT,
CMD_LOGOUT,
CMD_LOGOUT_RESULT,
CMD_NEW_USER_JOIN,
CMD_ERROR
};
struct DataHeader
{
int cmd;
int dataLength;
};
struct Login : public DataHeader
{
Login()
{
dataLength = sizeof(Login);
cmd = CMD_LOGIN;
}
char userName[32];
char passWord[32];
};
struct LoginResult : public DataHeader
{
LoginResult()
{
dataLength = sizeof(Login);
cmd = CMD_LOGIN_RESULT;
result = 0;
}
int result;
};
struct Logout : public DataHeader
{
Logout()
{
dataLength = sizeof(Logout);
cmd = CMD_LOGOUT;
}
char userName[32];
};
struct LogoutResult : public DataHeader
{
LogoutResult()
{
dataLength = sizeof(LogoutResult);
cmd = CMD_LOGOUT_RESULT;
result = 0;
}
int result;
};
struct NewUserJoin : public DataHeader
{
NewUserJoin()
{
dataLength = sizeof(NewUserJoin);
cmd = CMD_NEW_USER_JOIN;
sock = 0;
}
int sock;
};
int processor(SOCKET sock)
{
char szRecv[4096] = {};
int recvLen = recv(sock, szRecv, sizeof(DataHeader), 0);
DataHeader *pHeader = (DataHeader*)szRecv;
if (recvLen <= 0)
{
printf("与服务器断开连接,任务结束...");
return -1;
}
switch (pHeader->cmd)
{
case CMD_LOGIN_RESULT:
{
recv(sock, szRecv + sizeof(DataHeader), pHeader->dataLength - sizeof(DataHeader), 0);
LoginResult *loginRes = (LoginResult*)szRecv;
printf("收到服务器消息:CMD_LOGIN_RESULT, 数据长度:%d, result:%d\n",
loginRes->dataLength, loginRes->result);
}
break;
case CMD_LOGOUT_RESULT:
{
recv(sock, szRecv + sizeof(DataHeader), pHeader->dataLength - sizeof(DataHeader), 0);
LogoutResult *logoutRes = (LogoutResult*)szRecv;
printf("收到服务器消息:CMD_LOGOUT_RESULT, 数据长度:%d, result:%d\n",
logoutRes->dataLength, logoutRes->result);
}
case CMD_NEW_USER_JOIN:
{
recv(sock, szRecv + sizeof(DataHeader), pHeader->dataLength - sizeof(DataHeader), 0);
NewUserJoin *userJoin = (NewUserJoin*)szRecv;
printf("收到服务器消息:CMD_NEW_USER_JOIN, 数据长度:%d\n",
userJoin->dataLength);
}
break;
}
return 0;
}
bool g_bRun = true;
void cmdThread(SOCKET sock)
{
while (true)
{
char cmdBuf[128] = { 0 };
printf("请输入命令:[exit | login | logout | other]\n");
scanf("%s", &cmdBuf);
if (0 == strcmp(cmdBuf, "exit"))
{
g_bRun = false;
printf("退出cmdThread线程...\n");
break;
}
else if (0 == strcmp(cmdBuf, "login"))
{
Login login;
strcpy(login.userName, "ccf");
strcpy(login.passWord, "ccfPwd");
send(sock, (const char*)&login, sizeof(login), 0);
}
else if (0 == strcmp(cmdBuf, "logout"))
{
Logout logout;
strcpy(logout.userName, "ccf");
send(sock, (const char*)&logout, sizeof(logout), 0);
}
else
{
printf("不支持的命令,请重新输入.\n");
}
}
}
int main(int argc, char *argv[])
{
WSADATA wsaData;
int iResult = WSAStartup(MAKEWORD(2, 2), &wsaData);
if (iResult != NO_ERROR)
{
printf("WSAStartup() 错误,创建套接字库失败!\n");
return -1;
}
SOCKET ConnectSocket;
ConnectSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (ConnectSocket == INVALID_SOCKET) {
printf("创建套接字失败: %ld\n", WSAGetLastError());
WSACleanup();
return -1;
}
sockaddr_in clientService;
clientService.sin_family = AF_INET;
clientService.sin_addr.s_addr = inet_addr("127.0.0.1");
clientService.sin_port = htons(27015);
if (connect(ConnectSocket, (SOCKADDR*)&clientService, sizeof(clientService)) == SOCKET_ERROR) {
printf("连接服务器失败.\n");
WSACleanup();
return -1;
}
printf("客户端成功连接到服务器.\n");
std::thread thread_(cmdThread, ConnectSocket);
thread_.detach();
while (g_bRun)
{
fd_set fdReads;
FD_ZERO(&fdReads);
FD_SET(ConnectSocket, &fdReads);
timeval timeout = { 0, 0 };
int ret = select(ConnectSocket, &fdReads, NULL, NULL, &timeout);
if (ret < 0)
{
printf("select任务结束1...\n");
break;
}
if (FD_ISSET(ConnectSocket, &fdReads))
{
if (-1 == processor(ConnectSocket))
{
printf("select任务结束2\n");
break;
}
}
}
WSACleanup();
printf("客户端已退出...\n");
getchar();
return 0;
}
之前在CSDN上看到一篇博客,是基于UDP的Linux C++简单聊天室实现,我把源代码重新整理并放在个人的GitHub上面基于UDP的Linux C++简单聊天室,同样是使用select模型实现的,有兴趣可以看一下。