【发布时间】:2016-04-20 12:22:06
【问题描述】:
我正在尝试创建一个 HTTP 代理,根据 HTTP 请求中的 GET/CONNET 主机名,某些连接的优先级高于其他连接。
这个想法是根据给定的主机名列表来满足具有更高优先级的请求,每个主机名都有特定的优先级。
待处理的连接将被 accepter 线程存储在四个不同的队列中(每个优先级一个:最大、中等、最小和未分类); accepter 然后将fork() 一个子进程,它将按优先级顺序出列并处理挂起的连接。通过这样做,accepter 线程将始终接受新连接,并且对于每个排队的连接
简而言之,这是我的代理:
-
main:打开 TCP 套接字,绑定到给定端口,最多侦听 10 个连接,调用线程 accepter 将前面的
socket()调用打开的套接字 fd 传递给它并加入对于这个线程; -
accepter:该线程获取从main传来的socket fd并循环使用
accept()返回客户端socket,recv()来自客户端,解析请求并根据主机名我的自定义结构的 HTTP 请求将在适当的队列中排队;然后它会fork(),所以一个进程将出列并处理连接; - manageConnection:这个进程,由 accepter 派生,从队列中出列,检查解析主机名字段的弹出结构,打开套接字客户端,连接到服务器,以及 GET 或 CONNECT ,将满足请求。
新代理:不再是fork(),我创建了一个由四个线程组成的线程池(一个“接受器”和三个“连接器”:因为我打算将此代理放在我的 RPi 上) 2,它有一个四核处理器,我认为至少四个线程是好的)。我现在有一个mutex 和两个condition_variables。代码几乎相同,除了线程、互斥体和条件变量。这些是线程调用的新函数:
enqueue:该线程包含
accept()循环,它从客户端接收,解析HTTP请求,找到主机名并根据其优先级,将info_conn结构入队(代码开头的typedef);dequeue:该线程包含出队和管理连接循环,它从队列中获取
info_conn结构,检索客户端套接字(我从accept()循环中获得) ,解析主机名并管理 GET 或 CONNECT 请求。
问题:总是一样,在管理CONNECT请求时,来自客户端的recv()总是返回0:我知道recv()在连接的另一端断开时返回0,但这不是我想要的!
基于线程方法,这是一个微不足道的生产者/消费者问题(从队列弹出和推送到队列),所以我认为排队和出队的线程交替是正确的。
我的(新)代码:
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/time.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <thread>
#include <iostream>
#include <netdb.h>
#include <queue>
#include <list>
#include <vector>
#include <condition_variable>
#include <cstdlib>
using namespace std;
#define GET 0
#define CONNECT 1
#define DEFAULTCOLOR "\033[0m"
#define RED "\033[22;31m"
#define YELLOW "\033[1;33m"
#define GREEN "\033[0;0;32m"
#define MAX_SIZE 1000
#define CONNECT_200_OK "HTTP/1.1 200 Connection established\r\nProxy-agent: myproxy\r\n\r\n"
// my custom struct stored in queues
typedef struct info_connection {
int client_fd;
string host;
string payload;
int request;
} info_conn;
queue<info_conn>q1;
queue<info_conn>q2;
queue<info_conn>q3;
queue<info_conn>q4;
vector<thread> workers;
condition_variable cond_read, cond_write;
mutex mtx;
void enqueue(int sock_client);
void dequeue(void);
int main(int argc, char *argv[]) {
int socket_desc;
struct sockaddr_in server;
socket_desc = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (socket_desc == -1) {
perror("socket()");
exit(-1);
}
server.sin_family = AF_INET;
server.sin_addr.s_addr = INADDR_ANY;
if (argc == 2)
server.sin_port = htons(atoi(argv[1]));
printf("listening to port %d\n", atoi(argv[1]));
if (bind(socket_desc,(struct sockaddr *)&server, sizeof(server)) < 0) {
perror("bind failed. Error");
exit(-1);
}
printf("binded\n");
listen(socket_desc, 10);
printf("listen\n");
// thread pool, because I suck at forking
workers.push_back(thread(enqueue, socket_desc));
workers.push_back(thread(dequeue));
workers.push_back(thread(dequeue));
workers.push_back(thread(dequeue));
for (thread& t : workers) {
t.join();
}
return 0;
}
void enqueue(int sock_client) {
printf("enqueue()\n");
int client_sock;
struct sockaddr_in *client_struct;
unsigned int clilen;
bzero((char*)&client_struct, sizeof(client_struct));
clilen = sizeof(client_struct);
char host_name[128];
char buff[4096];
int n_recv, n_send;
char *start_row, *end_row, *tmp_ptr, *tmp_start;
int req;
while( (client_sock = accept(sock_client, (struct sockaddr *)&client_struct, &clilen)) ) {
memset(host_name, 0, sizeof(host_name));
n_recv = recv(client_sock, buff, sizeof(buff), 0);
if (n_recv < 0) {
perror("recv()");
break;
}
start_row = end_row = buff;
while ((end_row = strstr(start_row, "\r\n")) != NULL) {
int row_len = end_row - start_row;
if (row_len == 0)
break;
if (strncmp(buff, "GET ", 4) == 0) {
req = GET;
tmp_start = start_row + 4;
tmp_ptr = strstr(tmp_start, "//");
int len = tmp_ptr - tmp_start;
tmp_start = tmp_start + len + 2;
tmp_ptr = strchr(tmp_start, '/');
len = tmp_ptr - tmp_start;
strncpy(host_name, tmp_start, len);
break;
}
else if (strncmp(buff, "CONNECT ", 8) == 0) {
req = CONNECT;
tmp_start = start_row + 8;
tmp_ptr = strchr(tmp_start, ':');
int host_len = tmp_ptr - tmp_start;
strncpy(host_name, tmp_start, host_len);
break;
}
start_row = end_row + 2;
/* if ((start_row - buff) >= strlen(buff))
break;*/
}
unique_lock<mutex> locker(mtx, defer_lock);
locker.lock();
cond_write.wait(locker, [](){
return (q1.size() < MAX_SIZE || q2.size() < MAX_SIZE || q3.size() < MAX_SIZE || q4.size() < MAX_SIZE);
});
cout << "(DEBUG) thread " << this_thread::get_id() << " wants to insert, queues not full " <<
q1.size() << ' ' << q2.size() << ' ' << q3.size() << ' ' << q4.size() << '\n';
int priority = 0;
info_conn info_c;
info_c.client_fd = client_sock;
info_c.host = host_name;
info_c.request = req;
info_c.payload = string(buff);
cout << "(DEBUG) thread " << this_thread::get_id() << " looking for " << host_name <<
" queues" << '\n';
if (strcmp(host_name, "www.netflix.com") == 0) {
priority = 1;
printf("hostname = www.netflix.com, priority %d\n", priority);
q1.push(info_c);
}
else if (strcmp(host_name, "www.youtube.com") == 0) {
priority = 2;
printf("hostname = www.youtube.com, priority %d\n", priority);
q2.push(info_c);
}
else if (strcmp(host_name, "www.facebook.com") == 0) {
priority = 3;
printf("hostname = www.facebook.com, priority %d\n", priority);
q3.push(info_c);
}
else {
priority = 4;
printf("hostname %s not found in queues\n", host_name);
q4.push(info_c);
}
cout << GREEN << "(DEBUG) thread " << this_thread::get_id() << " inserted " <<
q1.size() << ' ' << q2.size() << ' ' << q3.size() << ' ' << q4.size() << DEFAULTCOLOR<< '\n';
locker.unlock();
cond_read.notify_all();
}
if (client_sock < 0) {
perror("accept failed");
exit(-1);
}
}
void dequeue(void) {
int fd_client = -1;
int fd_server = -1;
struct sockaddr_in server;
int what_request;
char host_name[128];
char buffer[1500];
int n_send, n_recv;
size_t length;
info_conn req;
// CONNECT
int r, max;
int send_200_OK;
int read_from_client = 0;
int read_from_server = 0;
int send_to_client = 0;
int send_to_server = 0;
struct timeval timeout;
char buff[8192];
fd_set fdset;
printf("dequeue()\n");
while (true) {
unique_lock<mutex> locker(mtx, defer_lock);
locker.lock();
cond_read.wait(locker, [](){
return (q1.size() > 0 || q2.size() > 0 || q3.size() > 0 || q4.size() > 0);
});
cout << "(DEBUG) thread " << this_thread::get_id() << " wants to remove, queues not empty " <<
q1.size() << ' ' << q2.size() << ' ' << q3.size() << ' ' << q4.size() << '\n';
if (q1.size() > 0) {
req = q1.front();
q1.pop();
}
else if (q2.size() > 0) {
req = q2.front();
q2.pop();
}
else if (q3.size() > 0) {
req = q3.front();
q3.pop();
}
else if (q4.size() > 0) {
req = q4.front();
q4.pop();
}
cout << YELLOW <<"(DEBUG) thread " << this_thread::get_id() << " removed, " <<
q1.size() << ' ' << q2.size() << ' ' << q3.size() << ' ' << q4.size() << DEFAULTCOLOR<<'\n';
locker.unlock();
// notify one, because I have only one "producer" thread
cond_write.notify_one();
fd_client = req.client_fd;
//memcpy(host_name, req.host.c_str(), strlen(req.host));
length = req.host.copy(host_name, req.host.size(), 0);
host_name[length] = '\0';
what_request = req.request;
//memcpy(buffer, req.payload, req.payload.size());
length = req.payload.copy(buffer, req.payload.size(), 0);
buffer[length] = '\0';
what_request = req.request;
//cout << RED <<"(DEBUG) thread " << this_thread::get_id() << " copied packet payload " <<
// buffer << DEFAULTCOLOR<<'\n';
struct addrinfo* result;
struct addrinfo* res;
int error;
struct sockaddr_in *resolve;
fd_server = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (fd_server < 0) {
perror("socket()");
exit(-1);
}
cout << "(DEBUG) thread " << this_thread::get_id() << " fd_server " << fd_server << '\n';
error = getaddrinfo(host_name, NULL, NULL, &result);
if (error != 0) {
if (error == EAI_SYSTEM) {
perror("getaddrinfo");
} else {
fprintf(stderr, "error in getaddrinfo for (%s): %s\n", host_name, gai_strerror(error));
}
exit(EXIT_FAILURE);
}
if (what_request == GET) {
server.sin_port = htons(80);
}
else if (what_request == CONNECT) {
server.sin_port = htons(443);
}
server.sin_family = AF_INET;
cout << "(DEBUG) thread " << this_thread::get_id() << " getaddrinfo()" << '\n';
for (res = result; res != NULL; res = res->ai_next) {
if (res->ai_family == AF_INET) {
resolve = (struct sockaddr_in *)res->ai_addr;
//char *ip = inet_ntoa(resolve->sin_addr);
//printf("%s\n", ip);
server.sin_addr.s_addr = resolve->sin_addr.s_addr;
if (connect(fd_server, (struct sockaddr *)&server, sizeof (struct sockaddr_in)) < 0) {
fflush(stdout);
perror("connect()");
}
else {
cout << "(DEBUG) thread " << this_thread::get_id() << " connected to " << inet_ntoa(server.sin_addr) << '\n';
}
break;
}
}
// dealing with GET
if (what_request == GET) {
cout << "thread " << this_thread::get_id() << " dealing GET " << host_name <<
" sending to server " << buffer << '\n';
n_send = send(fd_server, buffer, strlen(buffer)+1, 0);
if (n_send < 0) {
cout << "thread " << this_thread::get_id() << " error sending GET request to server" << '\n';
perror("send()");
break;
}
do {
memset(buffer, 0, sizeof(buffer));
n_recv = recv(fd_server, buffer, sizeof(buffer), 0);
cout << "thread " << this_thread::get_id() << " GET: " << host_name << " read from recv() " << n_recv << " bytes, " <<
fd_client << "<->" << fd_server << '\n';
n_send = send(fd_client, buffer, n_recv, 0);
} while (n_recv > 0);
if (n_recv < 0) {
cout << RED << "thread " << this_thread::get_id() << " error sending GET response from server to client" << DEFAULTCOLOR<<'\n';
perror("send()");
break;
}
close(fd_client);
close(fd_server);
cout << "thread " << this_thread::get_id() <<
" done with GET request, quitting\n";
}
// dealing with CONNECT
else if (what_request == CONNECT) {
cout << "thread " << this_thread::get_id() << " dealing CONNECT " << host_name << '\n';
max = fd_server >= fd_client ? fd_server+1 : fd_client+1;
send_200_OK = send(fd_client, CONNECT_200_OK, sizeof(CONNECT_200_OK), 0);
if (send_200_OK < 0) {
perror("send() 200 OK to client");
break;
}
cout << "thread " << this_thread::get_id() << " SENT 200 OK to client " << '\n';
int tot_recvd;
int tot_sent;
// TCP tunnel
while(true) {
memset(buff, 0, sizeof(buff));
FD_ZERO(&fdset);
FD_SET(fd_client, &fdset);
FD_SET(fd_server, &fdset);
timeout.tv_sec = 15;
timeout.tv_usec = 0;
r = select(max, &fdset, NULL, NULL, &timeout);
if (r < 0) {
perror("select()");
close(fd_client);
close(fd_server);
break;
}
if (r == 0) { // select timed out
printf("tunnel(): select() request timeout 408\n");
close(fd_client);
close(fd_server);
break;
}
if (FD_ISSET(fd_client, &fdset)) {
tot_recvd = 0;
tot_sent = 0;
do {
read_from_client = recv(fd_client, &(buff[tot_recvd]), sizeof(buff), 0);
tot_recvd += read_from_client;
cout << "thread " << this_thread::get_id() <<
" select(), reading from client " << fd_client <<
" " << read_from_client << " bytes, " << fd_client<< " <-> " << fd_server<<'\n';
if (buff[tot_recvd-1] == '\0') {
break;
}
} while (read_from_client > 0);
if (read_from_client < 0) {
perror("recv()");
close(fd_client);
close(fd_server);
break;
}
if (read_from_client == 0) {
// this always happens!!!
}
send_to_server = send(fd_server, buff, read_from_client, 0);
if (send_to_server < 0) {
perror("send() to client");
close(fd_client);
close(fd_server);
break;
}
}
if (FD_ISSET(fd_server, &fdset)) {
tot_recvd = 0;
tot_sent = 0;
do {
read_from_server = recv(fd_server, &(buff[tot_recvd]), sizeof(buff), 0);
tot_recvd += read_from_server;
cout << "thread " << this_thread::get_id() <<
" select(), reading from server " << fd_client <<
" " << read_from_server << " bytes, " << fd_client<< " <-> " << fd_server<<'\n';
if (buff[tot_recvd-1] == '\0') {
break;
}
} while (read_from_server > 0);
if (read_from_server < 0) {
perror("read()");
close(fd_client);
close(fd_server);
break;
}
if (read_from_server == 0) {
cout << "thread " << this_thread::get_id() << " select(), server closed conn" << '\n';
close(fd_client);
close(fd_server);
break;
}
send_to_client = send(fd_client, buff, read_from_server, 0);
if (send_to_client < 0) {
perror("send() to client");
close(fd_client);
close(fd_server);
break;
}
}
}
cout << "thread " << this_thread::get_id() << " done with CONNECT request\n";
}
}
}
环境:代理运行在我运行 Ubuntu 14.04、x86_64 的笔记本电脑上;代理在 Chrome 上使用 SwitchyOmega 插件进行了测试,该插件可以将流量重定向到某个端口(我将传递给我的代理的同一端口),使用 g++ -std=c++11 -pedantic -Wall -o funwithproxyfork funwithproxyfork.cpp -lpthread 编译。
输出(在 Netflix 和 YouTube 上试过,它们都有同样的问题,即client closed conn,recv() 返回 0):
req: 1, hostname: www.netflix.com, priority: 1
thread 5611 accepting again
(CHILD 5627) is about to handle conn
(CHILD 5627) popped sock_client 4, request 1
req: 1, hostname: www2-ext-s.nflximg.net, priority: 4
thread 5611 accepting again
(CHILD 5628) is about to handle conn
(CHILD 5628) popped sock_client 4, request 1
req: 1, hostname: www2-ext-s.nflximg.net, priority: 4
thread 5611 accepting again
(CHILD 5629) is about to handle conn
(CHILD 5629) popped sock_client 4, request 1
(CHILD 5627) attempting to connect to 54.247.92.196 (www.netflix.com)
(CHILD 5628) attempting to connect to 54.247.125.40 (www.netflix.com)
(CHILD 5629) attempting to connect to 54.247.110.247 (www.netflix.com)
(CHILD 5627) connected to www.netflix.com, dealing CONNECT request
(CHILD 5628) connected to www.netflix.com, dealing CONNECT request
(CHILD 5628) client closed conn
(CHILD 5627) client closed conn
(CHILD 5628) done with CONNECT request
(CHILD 5627) done with CONNECT request
req: 1, hostname: www.netflix.com, priority: 1
thread 5611 accepting again
(CHILD 5630) is about to handle conn
(CHILD 5630) popped sock_client 4, request 1
(CHILD 5630) attempting to connect to 176.34.188.125 (www.netflix.com)
(CHILD 5629) connected to www.netflix.com, dealing CONNECT request
(CHILD 5629) client closed conn
(CHILD 5629) done with CONNECT request
(CHILD 5630) connected to www.netflix.com, dealing CONNECT request
然后它什么也没说。
【问题讨论】:
-
您确实意识到您的代码存在重大设计问题,除了 recv() 问题,对吧? 1. 如果客户端连接,并且不发送命令但保持连接打开,您的服务器进程将挂起,并且在收到命令之前不接受任何新连接 2. fork() 克隆整个进程空间。子进程从队列中移除元素对父进程中的队列没有影响。没有任何东西从他们身上移除。即使修复了 recv() 问题,此代码也不会工作。
-
我想我得到了第二点,所以我创建了一个有 4 个线程的线程池(一个循环用于接受连接,三个用于打开和管理连接),因为我有一个四核 ARM 处理器。关于第一点,我不明白你说的客户端“发送命令”是什么意思。
-
然而,即使有线程且没有分叉,客户端在处理 CONNET 请求时仍会向我的代理发送 0 个字节。
标签: c++ multithreading sockets proxy fork