【问题标题】:Thread synchronization with boost::condition_variable使用 boost::condition_variable 进行线程同步
【发布时间】:2011-04-15 14:55:31
【问题描述】:

我正在做一些关于 C++ 多线程的实验,但我不知道如何解决一个问题。假设我们有线程池,它使用现有线程处理用户请求并在没有可用线程时创建新线程。我创建了 command_queue 线程安全类,它具有 push 和 pop 方法。 pop 在队列为空时等待,仅在命令可用或发生超时时返回。现在是时候实现线程池了。这个想法是让空闲线程休眠一段时间,如果在那段时间之后无事可做,则终止线程。这是实现

command_queue::handler_t handler;
while (handler = tasks.pop(timeout))
{
    handler();
}

如果发生超时,我们在这里退出线程过程。这很好,但是新线程的创建存在问题。假设我们已经有 2 个线程处理用户请求,它们目前正在工作,但我们需要异步执行一些其他操作。 我们叫

thread_pool::start(some_operation);

应该启动新线程,因为没有可用的空闲线程。当线程可用时,它会在条件变量上调用timed_wait,因此我们的想法是检查是否有线程在等待。

if (thread_are_free_threads) // ???
   condition.notify_one();
else
   create_thread(thread_proc);

但是如何检查呢?文档说,如果没有等待线程 notify_one 什么也不做。如果我可以检查它是否什么都没做,那将是一个解决方案

if (!condition.notify_one()) // nobody was notified
   create_thread(thread_proc);

据我所知,没有办法检查。

感谢您的回答。

【问题讨论】:

  • 你知道用小cmets写代码通常比用少量代码的英文要清晰得多。将完整代码一次性发布。

标签: c++ boost threadpool


【解决方案1】:

您需要创建另一个变量(可能是一个信号量),它知道有多少线程正在运行,然后您可以在调用 notify 之前检查它并在需要时创建一个新线程。

另一个更好的选择是在线程超时时不让它们退出。他们应该活着等待通知。而不是在通知超时时退出,而是检查变量以查看程序是否仍在运行或是否“关闭”,如果它仍在运行,则再次开始等待。

【讨论】:

  • boost::thread_group可以简化后者,即你可以在他们等待的时候调用interrupt_all()请求关机。
  • 这个想法是做一个智能线程池,它不会让不必要的线程保持活动状态。所以这不会解决问题,它只是通过改变任务来避免它:) 我会尝试信号量,它们应该会慢一点,但如果没有其他选择。
  • 保持线程活动并等待条件变量不是问题。它不消耗运行时资源,堆栈空间小。另一方面,创建线程很昂贵。
  • 这就是为什么我想在杀死线程之前等待一段时间,但是假设我们有一个服务器应用程序,它接收请求并异步处理它们,如果我们不杀死线程高峰时刻,我们最终可能会拥有几十个完全没有做任何事情的线程。从我的角度来看,这并不好。我正在尝试制作类似于 .NET 线程池的东西。
【解决方案2】:

更典型的线程池如下所示:

Pool::Pool()
{
    runningThreads = 0;
    actualThreads  = 0;
    finished       = false;
    jobQue.Init();

    mutex.Init();
    conditionVariable.Init();

    for(int loop=0; loop < threadCount; ++loop) { startThread(threadroutine); }
}

Pool::threadroutine()
{

    {
        // Extra code to count threads sp we can add more if required.
        RAIILocker doLock(mutex);
        ++ actualThreads;
        ++ runningThreads;
    }
    while(!finished)
    {
         Job job;
         {
             RAIILocker doLock(mutex);

             while(jobQue.empty())
             {
                 // This is the key.
                 // Here the thread is suspended (using zero resources)
                 // until some other thread calls the notify_one on the
                 // conditionVariable. At this point exactly one thread is release
                 // and it will start executing as soon as it re-acquires the lock
                 // on the mutex.
                 //
                 -- runningThreads;
                 conditionVariable.wait(mutex);
                 ++ runningThreads;
             }
             job = jobQue.getJobAndRemoveFromQue();
         }
         job.execute();
    }
    {
        // Extra code to count threads sp we can add more if required.
        RAIILocker doLock(mutex);
        -- actualThreads;
        -- runningThreads;
    }
}

Pool::AddJob(Job job)
{
    RAIILocker doLock(mutex);

    // This is where you would check to see if you need more threads.
    if (runningThreads == actualThreads) // Plus some other conditions.
    {
        // increment both counts. When it waits we decrease the running count.
        startThread(threadroutine);
    }
    jobQue.push_back(job);
    conditionVariable.notify_one();  // This releases one worker thread
                                     // from the call to wait() above.
                                     // Note: The worker thread will not start
                                     //       until this thread releases the mutex.
}

【讨论】:

  • @ledokol:我确定有,但您似乎正在尝试构建自己的。这就是它应该的样子。
  • 是的,这只是为了实验目的,但 boost::asio 仍然不是我想要构建的。
  • @ledokol:以上是实现线程池的比较标准的模式。如果你需要别的东西,那么你需要更好地解释你需要什么(而不是你想要实现的)。
  • 好的。线程池初始化后没有任何线程。当用户请求异步操作时,线程池创建一个新线程并执行操作。操作完成后,线程会休眠一段时间并等待新的用户请求。超时线程被杀死后。因此线程池将任务分配给可用的空闲线程,如果没有可用的空闲线程,它会创建处理请求的新线程。当没有空闲线程时,用户请求不应留在队列中,应立即处理。
  • 稍后我可能会添加一些限制,例如 MaxThreadCount、MinThreadCount,通过配置这些值,我可以获得您描述的线程池。
【解决方案3】:

我认为您需要重新考虑您的设计。在发牌员线程向玩家线程分发工作的简单模型中,发牌员将工作放入消息队列中,并让其中一名玩家在有机会时接手工作。

在您的情况下,经销商正在积极管理线程池,因为它保留了有关哪些玩家线程空闲以及哪些线程忙碌的知识。由于荷官知道哪个玩家处于空闲状态,荷官可以主动将空闲任务传递给玩家,并使用简单的信号量(或 cond var)向玩家发出信号——每个玩家都有一个信号量。在这种情况下,经销商主动销毁空闲线程可能是有意义的,方法是给线程一个kill 我自己 的工作。

【讨论】:

  • 我的情况和你描述的完全一样。也许我对此还不够清楚。我只是将作业推入队列,工作线程正在等待发布新作业并醒来,因为他们看到有事情要做。但是为了满足我的需求,经销商必须在没有可用线程时创建线程,问题是经销商不知道此刻是否有空闲线程。问题是检查这个。你可以把它想象成 2 种线程——BOSS 和 WORKERS,BOSS 给任务,worker 处理它们,但是当没有空闲的 worker 时,BOSS 必须雇人
  • 消息队列模式是BOSS在布告栏上发布工作,工人完成前一个工作后接受工作。重要的是BOSS不知道工人有多忙。我的模式是 BOSS 将任务分配给个别工人(他知道他的名字),如果他找不到他知道的工作人员,则雇用另一名工人。
  • 可能我没看清楚你。你写了“既然庄家知道哪个玩家闲着”,但它怎么知道呢?
  • "如果找不到他知道的工作人员,则雇用另一名工人" - 是的,这就是我想要的,问题是我不知道如何找出是否有闲置的工人。
  • 你有一个空闲工人的名单。当一个 WORKER 完成它的工作时,它会将自己添加到空闲的 WORKERS 列表中(当然是原子的)
【解决方案4】:

现在我找到了一种解决方案,但它并不那么完美。 我有一个名为 free 的 volatile 成员变量——它存储池中的空闲线程数。

void thread_pool::thread_function()
{
    free++;
    command_queue::handler_t handler;
    while (handler = tasks.pop(timeout))
    {
        free--;
        handler();
        free++;
    }
    free--;
}

当我将任务分配给线程时,我会做这样的事情

if (free == 0)
    threads.create_thread(boost::bind(&thread_pool::thread_function, this));

同步仍然存在问题,因为如果上下文将在空闲后切换--在thread_function 我们可能会创建一个新线程,我们实际上并不需要,但由于任务队列是线程安全的没问题,这只是不必要的开销。您能提出解决方案吗?您对此有何看法?也许最好让它保持原样,然后在这里再进行一次同步?

【讨论】:

    【解决方案5】:

    另一个想法。您可以查询消息队列的长度。如果时间过长,请创建一个新的工作人员。

    【讨论】:

    • 有可能,但这并不能保证所有任务都会立即启动。在这种情况下,您可能会请求执行某个操作,该操作将在其他一些操作完成后执行。这并不能解决问题。经销商希望所有任务立即执行,没有任何延迟,即使它在使用更多线程而不是处理器/内核方面无效。
    • 那就看看我的其他回答
    猜你喜欢
    • 1970-01-01
    • 2018-03-10
    • 2011-11-03
    • 2015-09-23
    • 1970-01-01
    • 1970-01-01
    • 2014-07-21
    • 1970-01-01
    相关资源
    最近更新 更多