【问题标题】:producer-consumer problem with pthreadspthreads的生产者-消费者问题
【发布时间】:2011-08-29 01:57:21
【问题描述】:

我正在尝试使用 pthread 和信号量来解决生产者-消费者问题,但看起来生产者线程没有生产,消费者线程也没有消费。似乎正在创建线程:

  /* Do actual work from this point forward */
  /* Create the producer threads */
  for(c1=1; c1<=argarray[1]; c1++)
  {
    pthread_create(&tid, &attr, producer, NULL);
    printf("Creating producer #%d\n", c1);    
  }

  /* Create the consumer threads */
  for(c1=1; c1<=argarray[2]; c1++)
  {
    pthread_create(&tid, &attr, consumer, NULL);
    printf("Creating consumer #%d\n", c1);    
  }

因为“创建生产者#x”和“创建消费者#x”被打印到屏幕上。但是,它不会从线程本身内部打印:

if(insert_item(item))
{
  fprintf(stderr, "Producer error.");
}
else
{
  printf("Producer produced %d\n", item);
}

同样适用于消费者线程。完整代码:

#include "buffer.h"
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <semaphore.h>

/* Create Buffer */
buffer_item buffer[BUFFER_SIZE];

/* Semaphore and Mutex lock */
sem_t cEmpty;
sem_t cFull;
pthread_mutex_t mutex;

/* Threads */
pthread_t tid; /* Thread ID */
pthread_attr_t attr; /* Thread attributes */

void *producer(void *param);
void *consumer(void *param);
void init();

/* Progress Counter */
int cg;

main(int argc, char *argv[])
{
  /* Variables */
  int argarray[3], c1;

  /* Argument counter checks */
  if(argc != 4)
  {
    fprintf(stderr, "usage: main [sleep time] [# of producer threads] [# of consumer threads]\n");
    return -1;
  }

  /* Get args from command line and change them into integers */
  argarray[0] = atoi(argv[1]); /* How long to sleep before ending */
  argarray[1] = atoi(argv[2]); /* Producer threads */
  argarray[2] = atoi(argv[3]); /* Consumer threads */

  /* Error check */
  if(argarray[1]<1)
  {
    fprintf(stderr, "argument 2 must be > 0\n");
    return -1;
  }
  if(argarray[2]<1)
  {
    fprintf(stderr, "argument 3 must be > 0\n");
    return -1;
  }    

  init();

  /* Do actual work from this point forward */
  /* Create the producer threads */
  for(c1=1; c1<=argarray[1]; c1++)
  {
    pthread_create(&tid, &attr, producer, NULL);
    printf("Creating producer #%d\n", c1);    
  }

  /* Create the consumer threads */
  for(c1=1; c1<=argarray[2]; c1++)
  {
    pthread_create(&tid, &attr, consumer, NULL);
    printf("Creating consumer #%d\n", c1);    
  }

  /* Ending it */
  sleep(argarray[0]);

  printf("Production complete.\n");
  exit(0);
}

void init()
{
  int c2;

  pthread_mutex_init(&mutex, NULL); /* Initialize mutex lock */
  pthread_attr_init(&attr); /* Initialize pthread attributes to default */
  sem_init(&cFull, 0, 0); /* Initialize full semaphore */
  sem_init(&cEmpty, 0, BUFFER_SIZE); /* Initialize empty semaphore */
  cg = 0; /* Initialize global counter */ 
  for(c2=0;c2<BUFFER_SIZE;c2++) /* Initialize buffer */
  {
    buffer[c2] = 0;
  }
}

void *producer(void *param)
{
  /* Variables */
  buffer_item item;

  while(1)
  { 
    sleep(rand());      
    item = (rand()); /* Generates random item */ 

    sem_wait(&cEmpty); /* Lock empty semaphore if not zero */
    pthread_mutex_lock(&mutex);

    if(insert_item(item))
    {
      fprintf(stderr, "Producer error."); 
    }
    else
    {
      printf("Producer produced %d\n", item); 
    }

    pthread_mutex_unlock(&mutex);
    sem_post(&cFull); /* Increment semaphore for # of full */
  }
}

void *consumer(void *param)
{
  buffer_item item;

  while(1)
  {
    sleep(rand());
    sem_wait(&cFull); /* Lock empty semaphore if not zero */
    pthread_mutex_lock(&mutex);
    if(remove_item(&item))
    {
      fprintf(stderr, "Consumer error."); 
    }
    else
    {
      printf("Consumer consumed %d\n", item);
    }

    pthread_mutex_unlock(&mutex);
    sem_post(&cEmpty); /* Increments semaphore for # of empty */
  }
}

int insert_item(buffer_item item)
{
  if(cg < BUFFER_SIZE) /* Buffer has space */
  {
    buffer[cg] = item;
    cg++;
    return 0;
  }
  else /* Buffer full */
  {
    return -1;
  }
}

int remove_item(buffer_item *item)
{
  if(cg > 0) /* Buffer has something in it */
  {
    *item = buffer[(cg-1)];
    cg--;
    return 0;
  }
  else /* Buffer empty */
  {
    return -1;
  }
}

终端输出:

user@isanacom:~/Desktop/PCthreads$ ./main 10 10 10
Creating producer #1
Creating producer #2
Creating producer #3
Creating producer #4
Creating producer #5
Creating producer #6
Creating producer #7
Creating producer #8
Creating producer #9
Creating producer #10
Creating consumer #1
Creating consumer #2
Creating consumer #3
Creating consumer #4
Creating consumer #5
Creating consumer #6
Creating consumer #7
Creating consumer #8
Creating consumer #9
Creating consumer #10
Production complete.

作为多线程的初学者,我确信它可能很简单,我忽略了,感谢您的帮助。

【问题讨论】:

    标签: c multithreading pthreads producer-consumer


    【解决方案1】:

    消费者和生产者都做了一个 sleep(rand()) ,它将在 0 到 MAX_INT 之间随机休眠几秒,在示例中,您给出的主线程将在 10 秒后终止。如果生产者的 rand() 值大于 10,他们将永远没有机会生产任何东西。

    【讨论】:

    • 不错的收获。实际上,尽管我认为他们已经通过从多个线程调用rand(不是线程安全的)来调用UB,可能同时。
    • 我可以使用我传入的睡眠参数来解决这个问题吗?
    • 您可以尝试将 sleep(rand()) 替换为 sleep(1)
    • 我可以在 main 函数的末尾调用 join 函数来解决这个问题吗?
    • @VaM999 是的,pthread_join() 是等待线程完成它正在做的事情的一种方式。然而在上面的例子中,消费者和生产者都没有完成。
    【解决方案2】:

    您不应在 main 函数结束时直接调用 exit。您应该首先为最后创建的线程调用 pthread_join,以保持主线程处于活动状态,直到其他线程死亡。

    【讨论】:

    • ..或使用其他一些方法来防止主线程退出。加入所有这些线程意味着出于这个原因保留对它们的引用,并在它们上保持一个 join() 循环。大皮塔饼。只等待输入字符,或使用 sleep() 循环或其他东西,比杂乱的 join() 更容易。
    【解决方案3】:

    您应该从使用线程数组开始...

    pthread_t tid[argarray[1] + argarray[2]];
    
    for(c1 = 0; c1 < argarray[1]; c1++)
    {
        pthread_create(&tid[c1], &attr, producer, NULL);
        printf("Creating producer #%d\n", c1); 
    }
    for(c1 = 0; c1 < argarray[2]; c1++)
    {
        pthread_create(&tid[c1 + argarray[1]], &attr, consumer, NULL);
        printf("Creating consumer #%d\n", c1); 
    }
    

    可能还有其他问题,但这是我看到的第一个问题......

    【讨论】:

    • 如果应用程序中没有使用线程ID,那么保留它是没有意义的。通过在所有创建循环中使用一个“&tid”来有效地转储它是可以的。
    • 甚至可能你可以将NULL作为pthread_t指针传递,但我没有尝试过。
    猜你喜欢
    • 2011-05-04
    • 1970-01-01
    • 2017-04-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-07-27
    • 2021-12-26
    相关资源
    最近更新 更多