【问题标题】:Is this the correct way to use pthread?这是使用 pthread 的正确方法吗?
【发布时间】:2020-11-16 02:08:59
【问题描述】:

我正在尝试使用多线程创建 HTTP 服务器。 main() 将 client_sock 从 accept() 交给工作线程之一。如果没有可用的工作线程,它会一直等到有一个。我被限制不能在工作线程中调用 accept()。到目前为止,这是我的代码的一部分。我的一些问题是:

  1. 我是否需要像现在一样使用 2 个 pthread 互斥体和条件变量?
  2. 在这些情况下我是否需要使用 pthread 锁定或解锁?
  3. 如果我想在服务器上创建文件时添加一个互斥锁,我是否必须创建另一个互斥变量或者现有的一个变量可以工作?
#include <iostream>
#include <err.h>
#include <fcntl.h>
#include <netdb.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <getopt.h>
#include <pthread.h>

#define SIZE 1024

struct shared_data
{
    int redundancy;
    int client_sock;
    int working_threads;
    int dispatch_ready;
    pthread_mutex_t* dispatch_mutex;
    pthread_mutex_t* worker_mutex;
    pthread_cond_t* dispatch_cond;
    pthread_cond_t* worker_cond;
};

void* receiveAndSend(void* obj)
{
    struct shared_data* data = (struct shared_data*) obj;

    int bytes;
    char buff[SIZE + 1];

    while(1)
    {
        while(!data->dispatch_ready)
        {
            pthread_cond_wait(data->dispatch_cond, data->dispatch_mutex);
        }
        data->dispatch_ready = 0;

        data->working_threads++;

        client_sock = data->client_sock;

        bytes = recv(client_sock, buff, SIZE, 0);

        // do work

        data->working_threads--;
        pthread_cond_signal(data->worker_cond);
    }
}

int main(int argc, char* argv[])
{
    if(argc < 2 || argc > 6)
    {
        char msg[] = "Error: invalid arg amount\n";
        write(STDERR_FILENO, msg, strlen(msg));
        exit(1);
    }

    char* addr = NULL;
    unsigned short port = 80;
    int num_threads = 4;
    int redundancy = 0;
    char opt;

    while((opt = getopt(argc, argv, "N:r")) != -1)
    {
        if(opt == 'N')
        {
            num_threads = atoi(optarg);

            if(num_threads < 1)
            {
                char msg[] = "Error: invalid input for -N argument\n";
                write(STDERR_FILENO, msg, strlen(msg));
                exit(1);
            }
        }
        else if(opt == 'r')
        {
            redundancy = 1;
        }
        else
        {
            // error (getopt automatically sends an error message)
            return 1;
        }
    }

    // non-option arguments are always the last indexes of argv, no matter how they are written in the terminal
    // optind is the next index of argv after all options
    if(optind < argc)
    {
        addr = argv[optind];
        optind++;
    }

    if(optind < argc)
    {
        port = atoi(argv[optind]);
    }

    if(addr == NULL)
    {
        char msg[] = "Error: no address specified\n";
        write(STDERR_FILENO, msg, strlen(msg));
        exit(1);
    }

    struct sockaddr_in serv_addr;
    memset(&serv_addr, 0, sizeof(serv_addr));
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_addr.s_addr = getaddr(addr);
    serv_addr.sin_port = htons(port);

    int serv_sock = socket(AF_INET, SOCK_STREAM, 0);
    if(serv_sock < 0)
    {
        err(1, "socket()");
    }

    if(bind(serv_sock, (struct sockaddr*) &serv_addr, sizeof(serv_addr)) < 0)
    {
        err(1, "bind()");
    }

    if(listen(serv_sock, 500) < 0)
    {
        err(1, "listen()");
    }

    // Connecting with a client
    struct sockaddr client_addr;
    socklen_t client_addrlen;

    pthread_mutex_t dispatch_mutex;
    pthread_mutex_init(&dispatch_mutex, NULL);
    pthread_mutex_t worker_mutex;
    pthread_mutex_init(&worker_mutex, NULL);
    pthread_cond_t dispatch_cond;
    pthread_cond_init(&dispatch_cond, NULL);
    pthread_cond_t worker_cond;
    pthread_cond_init(&worker_cond, NULL);

    struct shared_data data;

    data.redundancy = redundancy;
    data.dispatch_ready = 0;
    data.working_threads = 0;
    data.dispatch_mutex = &dispatch_mutex;
    data.worker_mutex = &worker_mutex;
    data.dispatch_cond = &dispatch_cond;
    data.worker_cond = &worker_cond;

    pthread_t* threads = new pthread_t[num_threads];
    for (int i = 0; i < num_threads; i++)
    {
        pthread_create(&threads[i], NULL, receiveAndSend, &data);
    }

    while(1)
    {
        data.client_sock = accept(serv_sock, &client_addr, &client_addrlen);

        while(data.working_threads == num_threads)
        {
            pthread_cond_wait(data.worker_cond, data.worker_mutex);
        }

        data.dispatch_ready = 1;
        pthread_cond_signal(data.dispatch_cond);
    }

    return 0;
}

【问题讨论】:

  • 为什么有一个复杂的切换协议,而不是让每个空闲的工作线程自己调用accept
  • 这是一个大学作业,我只是问我们是否可以这样做,但不幸的是我们不能。
  • 在这种情况下,这是有道理的——他们正在教授一个可用于许多其他工作线程应用程序的原理,仅以 webserver 为例。
  • 不要在 C 程序中包含

标签: c pthreads mutex thread-synchronization


【解决方案1】:

您的程序中有许多非常基本的错误,这些错误非常清楚地表明您不了解锁和条件变量(或正确使用指针)。

锁保护一些共享数据。您只有一个共享数据项,因此您应该只需要一个锁(互斥锁)来保护它。

条件变量表示某些条件为真。您的用例的合理条件是worker_availablework_available。 (将条件变量命名为 dispatch_condworker_cond 并不能帮助清晰。)

条件变量总是与一个互斥锁相关联,但您不需要两个单独的互斥锁,因为您有两个条件变量。


关于错误。

这段代码显然有问题:

    while(1)
    {
        while(!data->dispatch_ready)
        {
            pthread_cond_wait(data->dispatch_cond, data->dispatch_mutex);
        }

来自man pthread_cond_wait

原子地释放mutex并导致调用线程阻塞条件变量cond

如果这个线程从未获得过互斥锁,它如何释放它?
另外,这个线程如何在不获取互斥体的情况下读取data-&gt;dispatch_ready(与其他线程共享)?

这段代码:

    struct shared_data data;

    data.redundancy = redundancy;
    data.dispatch_ready = 0;
    data.working_threads = 0;
    data.dispatch_mutex = &dispatch_mutex;
    data.worker_mutex = &worker_mutex;
    data.dispatch_cond = &dispatch_cond;
    data.worker_cond = &worker_cond;

没有问题,但有不必要的间接性。您可以将dispatch_mutex 和条件变量设为shared_datapart,如下所示:

struct shared_data
{
    int redundancy;
    int client_sock;
    int working_threads;
    int dispatch_ready;
    pthread_mutex_t dispatch_mutex;
    pthread_mutex_t worker_mutex;
    pthread_cond_t dispatch_cond;
    pthread_cond_t worker_cond;
};

这是我注意到的最微妙的错误:

        data.client_sock = accept(serv_sock, &client_addr, &client_addrlen);
...
        data.dispatch_ready = 1;
        pthread_cond_signal(data.dispatch_cond);

在这里,您将唤醒等待dispatch_cond 的线程中的at least one,但可能会唤醒多个。如果多个线程被唤醒,它们都将在同一个client_sock 上继续到recv,这可能会导致灾难性的后果。

更新:

我该如何解决这个问题。

解决这个问题的最好和最有效的方法可能是拥有一个受锁保护的“工作项”队列(例如使用带有头尾指针的双链表)。

主线程会在尾部添加元素(同时持有锁),并发出“非空”条件变量的信号。

工作线程会移除 head 元素(同时持有锁)。
当队列为空时,工作线程将阻塞“非空”条件变量。

当队列已满(所有工作人员都忙)时,主线程可能会继续添加元素,或者它可能会阻塞等待工作人员变得可用,或者它可以向客户端返回“429 太多请求”。

【讨论】:

  • 谢谢!对于您答案的最后一部分,pthread_cond_signal 不是只唤醒一个等待该条件的线程吗?我以为只有pthread_cond_broadcast 把他们都吵醒了。
  • @Justin 你说得部分正确。手册页说“pthread_cond_signal() 函数应至少解除阻塞在指定条件变量 cond 上阻塞的线程之一(如果任何线程在 cond 上阻塞)。”它可能会取消阻止多个。
  • 即使多个线程从pthread_cond_signal 唤醒,难道不只有一个线程会离开while 循环吗?
  • @Justin No.“唤醒”的意思是“从pthread_cond_wait返回”。他们都将几乎同时检查dispatch_ready,并且所有可能观察值1并退出循环。
  • @Justin accept 是一个传入连接队列,但您练习的重点是能够处理多个客户端,因此您不能使用accept 作为您唯一的排队机制。
猜你喜欢
  • 2012-05-11
  • 2018-12-31
  • 2021-04-29
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-01-05
  • 2017-02-01
相关资源
最近更新 更多