【问题标题】:freezing in a multithreaded using condition variable使用条件变量在多线程中冻结
【发布时间】:2022-01-03 16:24:41
【问题描述】:

使用伪代码,我必须实现它以并行化计算昂贵的程序(热分布问题的 Jacobi 迭代方法)。

主线程伪代码:

Create N workers; each with a set of rows
While not max time step or threshold is reached 
   wait for all workers to finish their computation 
   check the max temp diff returned by all workers 
   swap matrices u and w
   if overall max temp diff > ε
     wake up all workers to execute next time step
   else threshold has reached
     wake up all workers and inform them to terminate
   endif
Wait for all workers and print their running statistics 
Get and print master running statistics
Update final_diff
Return no. of temp steps

工作线程的伪代码

Identify which set of rows to compute
While not terminate yet
   Compute the temp of all points in its set 
   Find the max temp diff in this set
   Signal master to test the diff 
   Wait for master instruction
   if instruction == stop 
      break the while loop
   else instruction == continue 
      continue the while loop
   endif
Get its running statistics and pass it to master 
Terminate

我不知道我的程序为什么会冻结,可能是由于以下两个原因:

  1. 在主线程中,如何“等待所有worker完成计算”?
  2. 在工作线程中,如何“向主线程发出信号以测试差异”?

我的代码如下:

主线程

    int find_steady_state (void)
{

// (3) Implement the thread creation and the main control logic here

    int thread_rtn;
    int * rptr;
    pthread_mutex_init(&mlock, NULL);
   pthread_mutex_init(&nlock, NULL);
    pthread_cond_init(&cond, NULL);
   pthread_cond_init(&mum, NULL);

    pthread_t thr_kids[thr_count];
    int i_value[thr_count];
   int rtn;
   double **temp;
    
   //Create N thread, according to the input
    for(int i = 0; i < thr_count; ++i){
        i_value[i] = i;
        thread_rtn = pthread_create(&thr_kids[i], NULL, &thr_func, (void*)&i_value[i]);
    }
    
   //run from 1 to max time step
    for(int its = 1; its < max_its; ++its){
      printf("Trial%d, count is %d\n", its, count);

      //find the maximum element in array sum_temp
      //sum_temp stored all the max temp in correspond location, eg: sum_temp[0] = max temp of first thread.
      max_temp = fmax(sum_temp[count], sum_temp[count-1]);

      //Wait for all workers to finish their work
      pthread_mutex_lock(&mlock);
        while(count != 0){
         pthread_cond_wait(&mum, &mlock);
      }

      pthread_mutex_lock(&mlock);
      printf("--- Main thread Wake up!! ---\n");

      //swap the matrix
      temp = u;
      u = w;
      w = temp;
      printf("--- Mother: The matrix is swapped ---\n");

   // test the value, if the value > EPSILON --> wake up all thread to execute next iterations.
   // else, wake up all thread and tell them to terminate.

      if(max_temp > EPSILON){
         stop = false;
         pthread_cond_broadcast(&cond);
         pthread_mutex_unlock(&nlock);
         printf("--- Mother: need to continue with temp = %f --- \n", max_temp);
         count = thr_count;
      }
      else{
         stop = true;
         pthread_cond_broadcast(&cond);
         pthread_mutex_unlock(&nlock);
         printf("--- Mother: Will Break with temp = %f --- \n", max_temp);
         break;

      }

      rtn = its;
   }

   /* Terminate the thread
      Print the running statistic
    */
   for(int i = 0; i < thr_count; ++i){
        pthread_join(thr_kids[i], (void **) &rtnArray);
      printf("Thread %d has completed - user: %.4f s, system: %.4f s\n", i, rtnArray[0], rtnArray[1]);
      // return iteration time
    }

   // update final_diff

   final_diff = max_temp;
        printf("--- Final temp: %f ---\n", final_diff);
   return rtn;
    }

工作线程

void *thr_func(void *arg) {

// (2) Add the worker's logic here
int x = *((int*)arg);
//Split the computation evenly
int start = x*(N/thr_count)+1;
int end = (x+1)*N/thr_count;
double diff = 0.0;
count = thr_count;


// used to return user time and sys time.
struct rusage usage;

if (end == N)
{
    end = end - 1;
}
//While not terminate
while(true){
   //find the max temp  diff in the set
    for(int i = start; i <end; i++){
        for(int j = 1; j < N-1;j++){
            w[i][j] = 0.25 * (u[i-1][j] + u[i+1][j] + u[i][j-1] + u[i][j+1]);
            if(fabs(w[i][j] - u[i][j]) > diff)
                diff = fabs(w[i][j] - u[i][j]);
        }
   }

if(diff >= sum_temp[x]){
   sum_temp[x] = diff;
   printf("new temp for trial %d = %f\n", x, sum_temp[x]);
}
//Signal master thread to test the diff
count--;
if(count == 0){
   pthread_cond_signal(&mum);
   pthread_mutex_unlock(&mlock);
}
pthread_mutex_lock(&nlock);
/*Wait for the master instruction
if stop == true --> main thread tells workers can stop work
else stop == false --> run the loop again.
*/

pthread_cond_wait(&cond, &nlock);

//receive the instruction 
// stop is bool type 
if(stop){
printf("--- STOP!!! --- \n");
    break;
}
else if(!stop){
   printf("--- Continue ---\n");
    continue;
}

}

printf("Count = %d\n", count);
printf("\nThread%d: The max temp diff for the set between %d and %d is: %f\n", x,  start, end, diff);

/* terminate and return running statistic
   send to main thread
*/
rtnArray = (float*) malloc(sizeof(float) *2);
rtnArray[0] = (usage.ru_utime.tv_sec + usage.ru_utime.tv_usec/1000000.0);
rtnArray[1] = (usage.ru_stime.tv_sec + usage.ru_stime.tv_usec/1000000.0);
pthread_exit(rtnArray);
return rtnArray;
}

【问题讨论】:

  • 我不知道为什么你需要一个 worker 和 main 之间的互斥锁 - 在这里,一个简单的 join() 就足够了。此外,工作人员从主线程锁定的互斥锁开始,然后等待一个永远不会发生的空闲互斥锁 - 他们永远不会运行。
  • 我的想法是因为工作人员需要运行服务时间(每次迭代一次)。所以我尝试使用条件变量。无论如何谢谢你的建议!! @tofro

标签: c multithreading pthreads mutex race-condition


【解决方案1】:

问: 在主线程中,如何“等待所有worker完成计算”?
答: @ 987654321@(另见pthread_barrier_init

问: 在工作线程中,如何“Signal master thread to test the diff”?
答:不需要为信号。同样,如果主线程在屏障处等待,它将在所有线程都到达该屏障后继续。

您的代码只需要两个屏障,不需要互斥锁或条件变量。

【讨论】:

  • 使用条件变量和互斥锁的问题。我可以用 c.v & lock 实现屏障方法吗?还是谢谢!!
  • 嗯,屏障基本上是计数器(线程数)上的自旋锁,它将连续减少。达到零后,锁将被释放(解锁)并且所有线程将继续。在条件变量的情况下,每个线程都会递减计数器,如果不为零,将进入等待模式。将计数器设置为零的线程将通知所有线程唤醒。因此,无论是屏障还是互斥体和条件变量,都没有多大意义(只有在不分离的情况下,例如主线程不参与屏障)。
  • 看到这个帖子*.com/questions/70053844/… 是不是类似实现屏障?
  • 同样,一个屏障包含一个 global 计数器(必须由互斥锁保护),并且每个想要参与该屏障的线程都必须减少计数器。如果计数器不为零(尚未),则线程必须以某种方式挂起(屈服、睡眠或等待条件变量)。将计数器设置为零的线程负责通知(唤醒)所有其他线程(如果它们尚未连续检查计数器)。
  • 我已经更新了代码,你可以看看吗?谢谢!