【问题标题】:Problem with thread synchronization and condition variables in CC中线程同步和条件变量的问题
【发布时间】:2011-07-28 10:38:49
【问题描述】:

我有三个线程,一个是主线程,另外两个是工作线程。第一个线程,当有工作要做时唤醒两个线程中的一个。每个线程在被唤醒时都会执行一些计算,并且在执行此操作时,如果它发现有更多工作要做,则可以唤醒另一个工作线程或简单地决定自己完成工作(例如,通过将工作添加到本地队列)。 当工作线程有工作要做时,主线程必须等待工作完成。我已经用条件变量实现了如下(这里报告的代码隐藏了很多细节,请询问是否有无法理解的东西):

主线程(伪代码):

//this function can be called from the main several time. It blocks the main thread till the work is done.
void new_work(){

//signaling to worker threads if work is available

    //Now, the threads have been awakened, it's time to sleep till they have finished.
    pthread_mutex_lock(&main_lock);
    while (work > 0)    //work is a shared atomic integer, incremented each time there's work to do and decremented when finished executing some work unit
       pthread_cond_wait(&main_cond);
    pthread_mutex_unlock(&main_lock);

}

工作线程:

while (1){

   pthread_mutex_lock(&main_lock);
    if (work == 0)
       pthread_cond_signal(&main_cond);
    pthread_mutex_unlock(&main_lock);  

    //code to let the worker thread wait again -- PROBLEM!

   while (I have work to do, in my queue){
       do_work()
   }

}

问题是:当工作线程唤醒主线程时,我不确定工作线程是否会调用等待以将自己置于等待新工作的状态。即使我用另一个条件变量实现这个等待,也可能发生主线程是唤醒的,做一些工作直到他必须唤醒尚未调用等待的线程......这可以导致不好的结果。我尝试了几种方法来解决这个问题,但我找不到解决方案,也许有一个明显的方法可以解决它,但我错过了。

您能提供解决此类问题的方案吗?我使用的是 C 语言,我可以使用您认为适合的任何同步机制,例如 pthreads 或 posix 信号量。

谢谢

【问题讨论】:

  • 您是否需要专门向主线程发出作业已完成的信号(除了生成下一个工作项之外,主线程是否需要执行一些特殊的“工作完成”操作)?或者您使用该信号只是为了让主线程知道它可以产生下一个工作负载?
  • 我必须向主线程发出信号,让它知道它可以继续产生更多的工作量。它必须等到所有工作负载都已处理完毕并且线程再次等待。请务必注意,工作线程可以自行产生更多工作负载,如问题中所述。
  • 另外,我在未实现 pthread 屏障的 OSX 上进行开发也很重要。

标签: c multithreading synchronization pthreads


【解决方案1】:

处理这个问题的常用方法是有一个工作队列并保护它免受上溢和下溢。像这样的东西(我省略了“pthread_”前缀):

mutex queue_mutex;
cond_t queue_not_full, queue_not_empty;

void enqueue_work(Work w) {
    mutex_lock(&queue_mutex);
    while (queue_full())
        cond_wait(&queue_not_full, &queue_mutex);
    add_work_to_queue(w);
    cond_signal(&queue_not_empty);
    mutex_unlock(&queue_mutex);
}

Work dequeue_work() {
    mutex_lock(&queue_mutex);
    while (queue_empty())
        cond_wait(&queue_not_empty, &queue_mutex);
    Work w = remove_work_from_queue();
    cond_signal(&queue_not_full);
    mutex_unlock(&queue_mutex);
}

注意这些函数之间的对称性:入队 出队,空 满,not_empty 未满。

这为任意数量的线程产生工作和任意数量的线程消耗工作提供了一个线程安全的有界大小队列。 (实际上,这是使用条件变量的典型示例。)如果您的解决方案看起来不完全一样,它可能应该非常接近......

【讨论】:

  • 我知道这是正确的方案,但我不喜欢这个解决方案的原因是,毕竟,您在队列上执行的每个操作都使用互斥锁。由于我有一个无锁队列,这是我想要避免的那种模式。附言抱歉回复晚了。
【解决方案2】:

如果您希望主线程将工作分配给其他两个线程,请等到两个线程都完成工作后再继续,您也许可以通过屏障来完成。

屏障是一种同步结构,您可以使用它使线程在代码中的某个点等待,直到一定数量的线程都准备好继续前进。本质上,您初始化了一个 pthread 屏障,表示 x 个线程必须等待它,然后才能继续。当每个线程完成其工作并准备继续时,它将在屏障上等待,一旦 x 个线程达到屏障,它们都可以继续。

在您的情况下,您可能可以执行以下操作:

pthread_barrier_t barrier;
pthread_barrier_init(&barrier, 3);

master()
{
  while (work_to_do) {
    put_work_on_worker_queues();
    pthread_barrier_wait(&barrier);
  }
}

worker()
{
  while(1) {
    while (work_on_my_queue()) {
      do_work();
    }
    pthread_barrier_wait(&barrier);
  }
}

这应该让你的主线程发出工作,然后等待两个工作线程完成他们被赋予的工作(如果有的话),然后再继续。

【讨论】:

  • 您的回答中没有提到细节。在您编写的代码中,当工作线程可以在 barrier_wait 之后继续运行时,它会卡在一段时间(1),直到主线程添加更多工作。你可以说这是一个细节,但这对我来说很重要:我无法找到一种方法来保证工作线程会在线程可以继续执行其他操作之前进入睡眠状态。
  • 我不明白您要保证什么。在我的示例中,应在主线程向工作队列添加一些工作并在屏障上等待后立即唤醒工作线程。唤醒后,工作线程将检查其队列。如果有工作,它会先完成工作,然后回到屏障上睡觉,如果没有工作,它会直接回到屏障上睡觉。工作线程将无限期地在屏障上休眠,直到主线程添加新工作并再次遇到屏障。
  • 在我的示例中可能发生的情况是,一个线程完成了为其安排的所有工作,另一个线程根据其本地决定决定将工作交给第一个线程。在这种情况下,我认为障碍不适合。但是,我实际上是在没有 pthread 屏障实现的 OS X 上编写代码。
  • 我认为你应该看看@Nemo 的解决方案,在这种情况下。
【解决方案3】:

您能否拥有由主线程管理的“新作业”队列?主线程可以一次将 1 个作业分发给每个工作线程。主线程还将监听工人完成的工作。如果工作线程找到需要做的新工作,只需将其添加到“新工作”队列中,主线程就会分发它。

伪代码:

JobQueue NewJobs;
Job JobForWorker[NUM_WORKERS];

workerthread()
{
  while(wait for new job)
  {
    do job (this may include adding new jobs to NewJobs queue)
    signal job complete to main thread
  }
}

main thread()
{
  while(whatever)
  {
    wait for job completion on any worker thread
    now a worker thread is free put a new job on it
  }
}

【讨论】:

  • 抱歉,我看不到您的回答如何解决我的问题。可以更改我提出的方案或尝试不同的解决方案,但我错过了您的观点。
  • 你能放更多代码吗?您包含的代码并没有真正告诉我任何事情! - 因此,我认为我不理解您最初的问题(如果我的建议没有回答它)。
  • 我不确定我的方案如何不能解决您的问题,但我必须做出某些假设。你能用不同的话解释为什么那条线特别是“问题”吗? - 发生了什么问题?
【解决方案4】:

我相信您在这里看到的是 producer-consumer problem 的变体。您正在做的是编写计数信号量的临时实现(用于提供的不仅仅是互斥)。

如果我没看错你的问题,那么你要做的就是让工作线程阻塞,直到有一个可用的工作单元,然后在它可用时执行一个工作单元。您的问题在于有太多可用工作并且主线程试图解除阻塞已经在工作的工作人员。我将按如下方式构建您的代码。

sem_t main_sem;
sem_init(&main_sem, 0, 0);

void new_work() {
    sem_post(&main_sem);
    pthread_cond_wait(&main_cond);
}

void do_work() {
    while (1) {
        sem_wait(&main_sem);
        // do stuff
        // do more stuff
        pthread_cond_signal(&main_sem);
    }
}

现在,如果工作线程产生更多工作,那么他们可以简单地将sem_post 传递给信号量,并简单地将pthread_cond_signal 推迟到所有工作完成。

但是请注意,如果您确实需要主线程在工作者工作时总是阻塞,那么当您可以调用一个完成工作的函数时,将工作推送到另一个线程是没有用的。

【讨论】:

  • 您了解问题所在,但是您提出的解决方案存在问题。假设工作线程为 2。 main 调用的 cond_wait 正在等待来自一个线程的恰好一个信号,如果一个线程向 main 发出信号而另一个线程仍在处理一些工作会发生什么?第二个线程最终将向主线程发出信号,但主线程可能还没有调用 cond_wait。主线程必须等到所有其他线程完成所有可用的工作负载。
猜你喜欢
  • 2017-09-27
  • 2018-09-11
  • 2018-03-05
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2014-05-13
  • 2021-09-29
相关资源
最近更新 更多