【问题标题】:Design of multi-threaded server in cc语言多线程服务器的设计
【发布时间】:2016-04-27 10:00:26
【问题描述】:

尝试在 linux 上实现具有并发支持的简单回显服务器时。

使用了以下方法:

  • 使用pthread函数创建一个线程池,并维护在一个链表中。它在进程启动时创建,并在进程终止时销毁。
  • 主线程将接受请求,并使用POSIX message queue 存储接受的套接字文件描述符。
  • 池循环中的线程从消息队列中读取,并处理它得到的请求,当没有请求时,它将阻塞。

该程序现在似乎正在运行。

问题是:

  • 中间用message queue是否合适,够效率吗?
  • 实现需要处理来自多个客户端的并发请求的线程工具的一般方法是什么?
  • 如果不适合让线程在池中循环和阻塞以从消息队列中检索 msg,那么如何将请求传递给线程?

【问题讨论】:

    标签: c linux multithreading pthreads message-queue


    【解决方案1】:

    这对我来说似乎是不必要的复杂。多线程服务器的常用方法是:

    • 在线程进程中创建监听套接字
    • 在线程中接受客户端连接
    • 对于每个接受的客户端连接,创建一个新线程,该线程接收相应的文件描述符并完成工作
    • 工作线程在完全处理完客户端连接后关闭它

    我看不出在这里预填充线程池有什么好处。

    如果你真的想要一个线程池:

    我只会使用一个链接列表来接受连接,并使用pthread_mutex 来同步对它的访问:

    • 侦听器进程将客户端 fds 排入列表尾部。
    • 客户端在头部将其出列。

    如果队列为空,线程可以等待一个变量 (pthread_cond_wait) 并在连接可用时由侦听器进程 (pthread_cond_signal) 通知。

    另一种选择

    根据处理请求的复杂性,可以选择使服务器单线程,即在一个线程中处理所有连接。这完全消除了上下文切换,因此性能非常好。

    一个缺点是只使用了一个 CPU 内核。为了改善这一点,可以使用混合模型:

    • 为每个内核创建一个工作线程。
    • 每个线程同时处理 n 个连接。

    但是,您必须实施机制以在工作人员之间公平分配工作。

    【讨论】:

    • 在添加池之前,旧版本与描述的相同:为每个请求创建一个新线程,并在请求完全处理后终止该线程。但是,即使创建一个线程并没有那么繁重,我猜,一个繁重的流量可能会从线程池中受益,所以我正在尝试实现一个。
    • @EricWang 为什么你对此不满意?
    • 我更新了评论。在大流量服务器中,线程池会比为每个请求创建/终止线程提供更好的性能吗?
    • @EricWang 我已经在答案中添加了一些想法,也许有什么启发你
    • @DavidSchwartz 假设我们有两个线程 A 和 B。A 非常忙,B 非常忙。两个线程都获得了 CPU 时间,并且偶然地,B 比 A 更频繁地到达一个点,即在新连接挂起时接受新连接(即出队),而不是 A(因为 A 在那段时间做其他事情)。您可能会认为,这会随着时间的推移而平衡,但根据所做工作的性质,情况通常并非如此。所以根本原因是,调度程序公平地分配 CPU 时间,而不是连接(一个进程可能以单个连接结束,而另一个进程有很多!)
    【解决方案2】:

    除了使用 pthread_mutex,您还需要使用 pthread_cond_t(pthread 条件),这将允许您在线程池中的线程实际上不工作时让它们进入睡眠状态。否则,如果它们在循环中检查工作队列中的某些内容,您将浪费计算周期。

    我肯定会考虑使用 C++ 而不仅仅是纯 C。我建议它的原因是在 C++ 中您可以使用模板。使用纯虚拟基类(让我们称之为:“vtask”),您可以创建模板派生类,它们接受参数并在调用重载 operator() 时插入参数,从而在您的任务中实现更多功能:

    //============================================================================//
    
    void* thread_pool::execute_thread()
    {
        vtask* task = NULL;
        while(true)
        {
            //--------------------------------------------------------------------//
            // Try to pick a task
            m_task_lock.lock();
            //--------------------------------------------------------------------//
    
            // We need to put condition.wait() in a loop for two reasons:
            // 1. There can be spurious wake-ups (due to signal/ENITR)
            // 2. When mutex is released for waiting, another thread can be waken up
            //    from a signal/broadcast and that thread can mess up the condition.
            //    So when the current thread wakes up the condition may no longer be
            //    actually true!
            while ((m_pool_state != state::STOPPED) && (m_main_tasks.empty()))
            {
                // Wait until there is a task in the queue
                // Unlock mutex while wait, then lock it back when signaled
                m_task_cond.wait(m_task_lock.base_mutex_ptr());
            }
    
            // If the thread was waked to notify process shutdown, return from here
            if (m_pool_state == state::STOPPED)
            {
                //m_has_exited.
                m_task_lock.unlock();
                //----------------------------------------------------------------//
                if(mad::details::allocator_list_tl::get_allocator_list_if_exists() &&
                   tids.find(CORETHREADSELF()) != tids.end())
                    mad::details::allocator_list_tl::get_allocator_list()
                            ->Destroy(tids.find(CORETHREADSELF())->second, 1);
                //----------------------------------------------------------------//
    
                CORETHREADEXIT(NULL);
            }
    
            task = m_main_tasks.front();
            m_main_tasks.pop_front();
            //--------------------------------------------------------------------//
            //run(task);
            // Unlock
            m_task_lock.unlock();
            //--------------------------------------------------------------------//
    
            // execute the task
            run(task);
    
            m_task_count -= 1;
            m_join_lock.lock();
            m_join_cond.signal();
            m_join_lock.unlock();
    
            //--------------------------------------------------------------------//
        }
        return NULL;
    }
    
    //============================================================================//
    
    int thread_pool::add_task(vtask* task)
    {
    #ifndef ENABLE_THREADING
        run(task);
        return 0;
    #endif
    
        if(!is_alive_flag)
        {
            run(task);
            return 0;
        }
    
        // do outside of lock because is thread-safe and needs to be updated as
        // soon as possible
        m_task_count += 1;
    
        m_task_lock.lock();
    
        // if the thread pool hasn't been initialize, initialize it
        if(m_pool_state == state::NONINIT)
            initialize_threadpool();
    
        // TODO: put a limit on how many tasks can be added at most
        m_main_tasks.push_back(task);
    
        // wake up one thread that is waiting for a task to be available
        m_task_cond.signal();
    
        m_task_lock.unlock();
    
        return 0;
    }
    
    //============================================================================//
    
    void thread_pool::run(vtask*& task)
    {
        (*task)();
    
        if(task->force_delete())
        {
            delete task;
            task = 0;
        } else {
            if(task->get() && !task->is_stored_elsewhere())
                save_task(task);
            else if(!task->is_stored_elsewhere())
            {
                delete task;
                task = 0;
            }
        }
    }
    

    在上面,每个创建的线程都会运行 execute_thread(),直到 m_pool_state 设置为 state::STOPPED。您锁定 m_task_lock,如果状态不是 STOPPED 并且列表为空,则将 m_task_lock 传递给您的条件,这会使线程进入睡眠状态并释放锁。您创建任务(未显示),添加任务(顺便说一句,m_task_count 是原子的,这就是它是线程安全的原因)。在添加任务期间,发出条件信号唤醒一个线程,在获取并锁定 m_task_lock 后,该线程从 execute_thread() 的 m_task_cond.wait(m_task_lock.base_mutex_ptr()) 部分继续进行。

    注意:这是一个高度定制的实现,它将大多数 pthread 函数/对象包装到 C++ 类中,因此复制和粘贴将不起作用......对不起。而且w.r.t。 thread_pool::run(),除非您担心返回值,否则您只需要 (*task)() 行。

    我希望这会有所帮助。

    编辑:m_join_* 引用用于检查是否所有任务都已完成。主线程处于类似的条件等待状态,检查是否所有任务都已完成,因为这对于我在继续之前使用此实现的应用程序是必需的。

    【讨论】:

    • 是的,结合pthread_cond_t,创建生产者/消费者模型很有用,谢谢。
    • 这通常被称为工作窃取模型,如果您使用 C++,您最简单的实现可能就是使用 TBB,我强烈推荐。在比较计算时间的基准实现中,此实现的性能与 TBB 一样好,但 TBB 实现要简单得多。我们追求这个实现的唯一原因是因为这个实现几乎需要对现有代码进行零重写,而实现 TBB 则需要几乎完全重写(尽管如果我们能够使用 C++11 lambdas 则不需要,这是我们无法做到的) )。
    • 我对C++很陌生,但有计划提高技能,有时间会参考代码。
    • 我完成了迁移到使用pthread_mutex_t & pthread_condition_t,并使用链表来维护接受的socket fd。没有condition,它肯定会进入高cpu和死锁,再加上它工作得很好,非常低cpu,谢谢。
    最近更新 更多