【问题标题】:Which synchronization primitive should I employ here?我应该在这里使用哪个同步原语?
【发布时间】:2020-08-16 14:13:52
【问题描述】:
while(1) {
     char message_buffer[SIZE];
     ssize_t message_length = mq_receive(mq_identifier, message_buffer, _mqueue_max_msg_size NULL);

     if(message_len == -1) { /* error handling... */}

     pthread_t pt1;
     int ret = pthread_create(&pt1, NULL, handle_message, message_buffer);
     if(ret) { /* error handling ... */}
}

void * handle_message (void * message) {
    puts((char *) message);
    return NULL;
}

上面的例子不是 MRE,但它非常简单:

我有一个带有循环的主线程,它不断地消耗来自消息队列的消息。一旦收到一条新消息,它就会存储在本地message_buffer 缓冲区中。然后,产生一个新线程来“处理”所述新消息,因此消息缓冲区的地址被传递到handle_message,新线程随后执行该地址。


问题

通常,两个线程会打印相同的消息,即使我可以 100% 确定队列中的消息不相同。


我不完全确定,但我想我明白为什么会这样:

假设我将 2 条不同的消息推送到 mqueue,然后我才开始使用它们。

while 循环的第一次迭代中,消息将从队列中获取并保存到message_buffer。将产生一个新线程并将message_length 的地址传递给它。但是该线程可能不够快,无法将缓冲区的内容打印到流之前,下一条消息被消耗(在循环的下一次迭代中),message_buffer 的内容随后被覆盖。因此第一个和第二个线程现在打印相同的值。


我的问题是:解决这个问题的最有效方法是什么?我对并行编程和线程/pthreads 很陌生,而且我对不同的同步原语感到不知所措。

互斥问题

static pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;

while(1) {
     char message_buffer[SIZE];
     pthread_mutex_lock(&m);
     ssize_t message_length = mq_receive(mq_identifier, message_buffer, _mqueue_max_msg_size NULL);
     pthred_mutex_unlock(&m);

     if(message_len == -1) { /* error handling... */}

     pthread_t pt1;
     int ret = pthread_create(&pt1, NULL, handle_message, message_buffer);
     if(ret) { /* error handling ... */}
}

void * handle_message (void * message) {
    char own_buffer[SIZE];
    pthread_mutex_lock(&m);
    strncpy(own_buffer, (char *) message, SIZE);
    pthread_mutex_unlock(&m);
    puts(own_buffer);
    return NULL;
}

我认为我当前的互斥锁实现不正确,因为线程仍在接收重复消息。主线程可以锁定互斥体,将消息消耗到缓冲区中,解锁互斥体,生成一个线程,但该线程仍然可能挂起,主线程可以再次重写缓冲区(因为缓冲区互斥体从未被新的线程),有效地使我当前的互斥锁实现无用?我该如何克服这个问题?

【问题讨论】:

  • 您的评估是正确的。但是你的程序结构有点缺陷。如果您只有一个缓冲区,那么产生一个线程来使用它的意义何在。没有并行化好处,因为使用单个缓冲区,主线程和所有其他线程将需要等待消息被消耗,然后才能将新消息写入缓冲区。当然,您可以使用互斥锁来保护缓冲区,但这样做有什么意义呢?这在功能上是正确的,因为只有一个线程会访问缓冲区,但不会有并发优势。
  • @kaylum 这是实际代码的淡化版本:在其中,每个线程读取缓冲区并保存它自己的副本,以便它可以在其他线程并行的同时继续处理它对他们自己的消息做同样的事情。我想在线程创建自己的副本所需的时间内保护消息缓冲区,但我想知道最好的方法是什么。
  • 在这种情况下,您可以使用互斥锁。见pthread_muex_lock。根据您的实际应用程序,您可能会发现对缓冲区的争用可能会成为瓶颈。对于高性能应用程序,您的目标是尽量减少缓冲区的复制。
  • 与其调用mq_receive然后创建一个线程,不如反过来做:创建调用mq_receive的线程。整个问题都消失了。 (在实践中可以看到here。它使用了自定义队列模块,但概念是一样的。)
  • @bool3max 您使用自动调整或仅计算线程数的线程池。几乎可以肯定,您很快就不需要担心,只需创建一些额外的线程即可。

标签: c multithreading pthreads race-condition


【解决方案1】:

问题是您在保证线程已完成该内存之前结束了包含message_buffer 的循环。

while (1) {
    char message_buffer[SIZE];
    ssize_t message_length = mq_receive(...);
    if (message_len == -1) { /* error handling */ }

    pthread_t pt1;
    int ret = pthread_create(&pt1, NULL, handle_message, message_buffer);
    if (ret) { /* error handling */ }

    /****** Can't go beyond here until thread is done with message_buffer. ******/
}

void * handle_message (void * message) {
    char own_buffer[SIZE];
    strncpy(own_buffer, (char *) message, SIZE);

    /******* Only now can the caller loop back. ******/

    puts(own_buffer);
    return NULL;
}

您可以使用信号量或类似的。

static pthread_mutex_t mutex  = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t  cond   = PTHREAD_COND_INITIALIZER;
static int             copied = 0;

while (1) {
    char message_buffer[SIZE];
    ssize_t message_length = mq_receive(...);
    if (message_len == -1) { /* error handling */ }

    pthread_t pt1;
    int ret = pthread_create(&pt1, NULL, handle_message, message_buffer);
    if (ret) { /* error handling */ }

    // Wait until threads is done with message_buffer.
    pthread_mutex_lock(&mutex);
    while (!copied) pthread_cond_wait(&cond, &mutex);
    copied = 0;
    pthread_mutex_unlock(&mutex);
}

void * handle_message (void * message) {
    char own_buffer[SIZE];
    strncpy(own_buffer, (char *) message, SIZE);

    // Done with caller's buffer.
    // Signal caller to continue.
    pthread_mutex_lock(&mutex);
    copied = 1;
    pthread_cond_signal(&cond);
    pthread_mutex_unlock(&mutex);

    puts(own_buffer);
    return NULL;
}

(添加的块有效地执行信号量操作。有关更通用的实现,请参阅this answer 的最后一个 sn-p。)

但有一个更简单的解决方案:在创建线程之前制作副本。

while (1) {
    char message_buffer[SIZE];
    ssize_t message_length = mq_receive(...);
    if (message_len == -1) { /* error handling */ }

    pthread_t pt1;
    int ret = pthread_create(&pt1, NULL, handle_message, strdup(message_buffer));
    if (ret) { /* error handling */ }
}

void * handle_message (void * message) {
    char * own_buffer = message;
    puts(own_buffer);
    free(own_buffer);
    return NULL;
}

【讨论】:

  • 谢谢!如果我有时间做的话,将这两种方法与基于线程池的方法进行基准测试会很有趣。
  • 这里有一个缺陷:如果pthread_create 失败,你会丢失指向strdup(message_buffer) 的指针,所以除非/* error handling */ 做一些像exit(1) 这样剧烈的事情,否则你有内存泄漏。跨度>
  • @Joseph Sible-Reinstate Monica,为简洁起见,有意省略了错误处理。该方法可以在避免内存泄漏的同时使用
  • 怎么样?您分配了内存,但不再有指向它的指针。您需要在调用 pthread_create 之前添加代码以保存它;仅向/* error handling */ 添加代码是不够的。
  • 通过将指针存储在一个变量中,并在pthread_create 失败时将该变量传递给free
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2019-09-29
  • 2018-04-24
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多