【问题标题】:Is it OK to share the same epoll file descriptor among threads?在线程之间共享相同的 epoll 文件描述符是否可以?
【发布时间】:2015-09-30 04:10:04
【问题描述】:

在多个线程之间共享同一个 Epoll fd(不是 socket fd)是否安全?如果是这样,每个线程是否必须将自己的事件数组传递给epoll_wait(2),或者他们可以共享它吗?

例如

    void *thread_func(void *thread_args) {
      // extract socket_fd, epoll_fd, &event, &events_array from 
      //     thread_args
      // epoll_wait() using epoll_fd and events_array received from main
      // now all threads would be using same epoll_fd and events array 
    }

    void main( void ) {
      // create and bind to socket
      // create events_fd
      // allocate memory for events array
      // subscribe to events EPOLLIN and EPOLLET
      // pack the socket_fd, epoll_fd, &events, &events_array into 
      //   thread_args struct.

      // create multiple threads and pass thread_func and 
      //   same thread_args to all threads
    }

或者这样做更好:

    void *thread_func(void *socket_fd) {
      // create events_fd
      // allocate memory for events array
      // subscribe to events EPOLLIN and EPOLLET
      // epoll_wait using own epoll_fd and events_array
      // now all threads would have a separate epoll_fd with 
      //   events populated on its own array
   }

   void main(void) {
     // create and bind to socket

     //create multiple threads and pass thread_func and socket_fd to 
     //  all threads
   }

有没有一个很好的例子来说明如何在 C 中做到这一点?我看到的示例在main() 中运行事件循环,并在检测到事件时生成一个新线程来处理请求。我想做的是在程序开始时创建特定数量的线程,并让每个线程运行事件循环并处理请求。

【问题讨论】:

    标签: c linux pthreads epoll


    【解决方案1】:

    在几个人之间共享同一个 Epoll fd(不是 socket fd)是否安全 线程。

    是的,它是安全的——epoll(7) 接口是线程安全的——但是这样做时你应该小心,你至少应该使用EPOLLET(边缘触发模式,而不是默认的电平触发) 以避免其他线程中的虚假唤醒。这是因为电平触发模式会在有新事件可供处理时唤醒每个线程。由于只有一个线程会处理它,这会不必要地唤醒大多数线程。

    如果使用共享 epfd,每个线程都必须传递自己的事件 数组或共享事件数组到 epoll_wait()

    是的,您需要在每个线程上都有一个单独的事件数组,否则您将遇到竞争条件并且可能会发生令人讨厌的事情。例如,您可能有一个线程仍在迭代epoll_wait(2) 返回的事件并处理请求,而突然另一个线程使用相同的数组调用epoll_wait(2),然后在另一个线程被覆盖的同时事件被覆盖阅读它们。不好! 绝对每个线程都需要一个单独的数组。

    假设每个线程都有一个单独的数组,两种可能性 - 等待同一个 epoll fd 或每个线程有一个单独的 epoll fd - 都可以正常工作,但请注意语义不同。使用全局共享的 epoll fd,每个线程都等待来自 any 客户端的请求,因为客户端都被添加到同一个 epoll fd。每个线程都有一个单独的 epoll fd,那么每个线程本质上负责客户端的子集(那些被该线程接受的客户端)。

    这可能与您的系统无关,或者可能会产生巨大的影响。例如,一个线程可能很不幸地获得了一组发出繁重和频繁请求的高级用户,从而使该线程过度工作,而其他具有不那么激进的客户端的线程几乎处于空闲状态。那岂不是不公平?另一方面,也许您希望只有一些线程处理特定类别的用户,在这种情况下,在每个线程上使用不同的 epoll fds 可能是有意义的。像往常一样,您需要考虑这两种可能性,权衡取舍,考虑您的具体问题,然后做出决定。

    以下是使用全局共享的 epoll fd 的示例。我最初并不打算做所有这些,但一件事导致了另一件事,而且,这很有趣,我认为它可能会帮助你开始。它是一个监听 3000 端口的 echo 服务器,有一个由 20 个线程组成的池,使用 epoll 来同时接受新的客户端和服务请求。

    #include <stdio.h>
    #include <stdlib.h>
    #include <inttypes.h>
    #include <errno.h>
    #include <string.h>
    #include <pthread.h>
    #include <assert.h>
    #include <unistd.h>
    #include <sys/types.h>
    #include <sys/socket.h>
    #include <arpa/inet.h>
    #include <sys/epoll.h>
    
    #define SERVERPORT 3000
    #define SERVERBACKLOG 10
    #define THREADSNO 20
    #define EVENTS_BUFF_SZ 256
    
    static int serversock;
    static int epoll_fd;
    static pthread_t threads[THREADSNO];
    
    int accept_new_client(void) {
    
        int clientsock;
        struct sockaddr_in addr;
        socklen_t addrlen = sizeof(addr);
        if ((clientsock = accept(serversock, (struct sockaddr *) &addr, &addrlen)) < 0) {
            return -1;
        }
    
        char ip_buff[INET_ADDRSTRLEN+1];
        if (inet_ntop(AF_INET, &addr.sin_addr, ip_buff, sizeof(ip_buff)) == NULL) {
            close(clientsock);
            return -1;
        }
    
        printf("*** [%p] Client connected from %s:%" PRIu16 "\n", (void *) pthread_self(),
               ip_buff, ntohs(addr.sin_port));
    
        struct epoll_event epevent;
        epevent.events = EPOLLIN | EPOLLET;
        epevent.data.fd = clientsock;
    
        if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, clientsock, &epevent) < 0) {
            perror("epoll_ctl(2) failed attempting to add new client");
            close(clientsock);
            return -1;
        }
    
        return 0;
    }
    
    int handle_request(int clientfd) {
        char readbuff[512];
        struct sockaddr_in addr;
        socklen_t addrlen = sizeof(addr);
        ssize_t n;
    
        if ((n = recv(clientfd, readbuff, sizeof(readbuff)-1, 0)) < 0) {
            return -1;
        }
    
        if (n == 0) {
            return 0;
        }
    
        readbuff[n] = '\0';
    
        if (getpeername(clientfd, (struct sockaddr *) &addr, &addrlen) < 0) {
            return -1;
        }
    
        char ip_buff[INET_ADDRSTRLEN+1];
        if (inet_ntop(AF_INET, &addr.sin_addr, ip_buff, sizeof(ip_buff)) == NULL) {
            return -1;
        }
    
        printf("*** [%p] [%s:%" PRIu16 "] -> server: %s", (void *) pthread_self(),
               ip_buff, ntohs(addr.sin_port), readbuff);
    
        ssize_t sent;
        if ((sent = send(clientfd, readbuff, n, 0)) < 0) {
            return -1;
        }
    
        readbuff[sent] = '\0';
    
        printf("*** [%p] server -> [%s:%" PRIu16 "]: %s", (void *) pthread_self(),
               ip_buff, ntohs(addr.sin_port), readbuff);
    
        return 0;
    }
    
    void *worker_thr(void *args) {
        struct epoll_event *events = malloc(sizeof(*events)*EVENTS_BUFF_SZ);
        if (events == NULL) {
            perror("malloc(3) failed when attempting to allocate events buffer");
            pthread_exit(NULL);
        }
    
        int events_cnt;
        while ((events_cnt = epoll_wait(epoll_fd, events, EVENTS_BUFF_SZ, -1)) > 0) {
            int i;
            for (i = 0; i < events_cnt; i++) {
                assert(events[i].events & EPOLLIN);
    
                if (events[i].data.fd == serversock) {
                    if (accept_new_client() == -1) {
                        fprintf(stderr, "Error accepting new client: %s\n",
                            strerror(errno));
                    }
                } else {
                    if (handle_request(events[i].data.fd) == -1) {
                        fprintf(stderr, "Error handling request: %s\n",
                            strerror(errno));
                    }
                }
            }
        }
    
        if (events_cnt == 0) {
            fprintf(stderr, "epoll_wait(2) returned 0, but timeout was not specified...?");
        } else {
            perror("epoll_wait(2) error");
        }
    
        free(events);
    
        return NULL;
    }
    
    int main(void) {
        if ((serversock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
            perror("socket(2) failed");
            exit(EXIT_FAILURE);
        }
    
        struct sockaddr_in serveraddr;
        serveraddr.sin_family = AF_INET;
        serveraddr.sin_port = htons(SERVERPORT);
        serveraddr.sin_addr.s_addr = INADDR_ANY;
    
        if (bind(serversock, (const struct sockaddr *) &serveraddr, sizeof(serveraddr)) < 0) {
            perror("bind(2) failed");
            exit(EXIT_FAILURE);
        }
    
        if (listen(serversock, SERVERBACKLOG) < 0) {
            perror("listen(2) failed");
            exit(EXIT_FAILURE);
        }
    
        if ((epoll_fd = epoll_create(1)) < 0) {
            perror("epoll_create(2) failed");
            exit(EXIT_FAILURE);
        }
    
        struct epoll_event epevent;
        epevent.events = EPOLLIN | EPOLLET;
        epevent.data.fd = serversock;
    
        if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, serversock, &epevent) < 0) {
            perror("epoll_ctl(2) failed on main server socket");
            exit(EXIT_FAILURE);
        }
    
        int i;
        for (i = 0; i < THREADSNO; i++) {
            if (pthread_create(&threads[i], NULL, worker_thr, NULL) < 0) {
                perror("pthread_create(3) failed");
                exit(EXIT_FAILURE);
            }
        }
    
        /* main thread also contributes as worker thread */
        worker_thr(NULL);
    
        return 0;
    }
    

    几点说明:

    • main() 应该返回 int,而不是 void(如您在示例中所示)
    • 始终处理错误返回代码。忽略它们是很常见的,当事情破裂时,很难知道发生了什么。
    • 代码假定没有请求大于 511 字节(从 handle_request() 中的缓冲区大小可以看出)。如果请求大于此值,则可能某些数据会在套接字中保留很长时间,因为epoll_wait(2) 不会在该文件描述符上发生新事件之前报告它(因为我们正在使用@987654331 @)。在最坏的情况下,客户端可能永远不会真正发送任何新数据,并永远等待回复。
    • 打印每个请求的线程标识符的代码假定pthread_t 是一个不透明的指针类型。确实,pthread_t在Linux中是指针类型,但在其他平台可能是整数类型,所以这个是不可移植的。不过,这可能不是什么大问题,因为 epoll 是特定于 Linux 的,所以代码无论如何都不可移植。
    • 它假定当线程仍在为来自该客户端的请求提供服务时,没有来自同一客户端的其他请求到达。如果同时有一个新请求到达并且另一个线程开始为它提供服务,我们就有一个竞争条件,客户端不一定会按照他发送它们的顺序接收回显消息(但是,write(2) 是原子的,所以当回复可能出现故障,它们不会穿插)。

    【讨论】:

    • 感谢您的全面回答。这很有帮助。
    • @MiJo 很高兴我能帮上忙。这是一个很好的问题:)
    • @Filipe Gonçalves 我有个问题:多线程共享epoll fd时,如何处理多线程之间的负载均衡
    猜你喜欢
    • 2012-12-03
    • 2014-01-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-05-15
    • 1970-01-01
    • 2016-03-10
    • 1970-01-01
    相关资源
    最近更新 更多