【发布时间】:2020-08-16 14:13:52
【问题描述】:
while(1) {
char message_buffer[SIZE];
ssize_t message_length = mq_receive(mq_identifier, message_buffer, _mqueue_max_msg_size NULL);
if(message_len == -1) { /* error handling... */}
pthread_t pt1;
int ret = pthread_create(&pt1, NULL, handle_message, message_buffer);
if(ret) { /* error handling ... */}
}
void * handle_message (void * message) {
puts((char *) message);
return NULL;
}
上面的例子不是 MRE,但它非常简单:
我有一个带有循环的主线程,它不断地消耗来自消息队列的消息。一旦收到一条新消息,它就会存储在本地message_buffer 缓冲区中。然后,产生一个新线程来“处理”所述新消息,因此消息缓冲区的地址被传递到handle_message,新线程随后执行该地址。
问题
通常,两个线程会打印相同的消息,即使我可以 100% 确定队列中的消息不相同。
我不完全确定,但我想我明白为什么会这样:
假设我将 2 条不同的消息推送到 mqueue,然后我才开始使用它们。
在while 循环的第一次迭代中,消息将从队列中获取并保存到message_buffer。将产生一个新线程并将message_length 的地址传递给它。但是该线程可能不够快,无法将缓冲区的内容打印到流之前,下一条消息被消耗(在循环的下一次迭代中),message_buffer 的内容随后被覆盖。因此第一个和第二个线程现在打印相同的值。
我的问题是:解决这个问题的最有效方法是什么?我对并行编程和线程/pthreads 很陌生,而且我对不同的同步原语感到不知所措。
互斥问题
static pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
while(1) {
char message_buffer[SIZE];
pthread_mutex_lock(&m);
ssize_t message_length = mq_receive(mq_identifier, message_buffer, _mqueue_max_msg_size NULL);
pthred_mutex_unlock(&m);
if(message_len == -1) { /* error handling... */}
pthread_t pt1;
int ret = pthread_create(&pt1, NULL, handle_message, message_buffer);
if(ret) { /* error handling ... */}
}
void * handle_message (void * message) {
char own_buffer[SIZE];
pthread_mutex_lock(&m);
strncpy(own_buffer, (char *) message, SIZE);
pthread_mutex_unlock(&m);
puts(own_buffer);
return NULL;
}
我认为我当前的互斥锁实现不正确,因为线程仍在接收重复消息。主线程可以锁定互斥体,将消息消耗到缓冲区中,解锁互斥体,生成一个线程,但该线程仍然可能挂起,主线程可以再次重写缓冲区(因为缓冲区互斥体从未被新的线程),有效地使我当前的互斥锁实现无用?我该如何克服这个问题?
【问题讨论】:
-
您的评估是正确的。但是你的程序结构有点缺陷。如果您只有一个缓冲区,那么产生一个线程来使用它的意义何在。没有并行化好处,因为使用单个缓冲区,主线程和所有其他线程将需要等待消息被消耗,然后才能将新消息写入缓冲区。当然,您可以使用互斥锁来保护缓冲区,但这样做有什么意义呢?这在功能上是正确的,因为只有一个线程会访问缓冲区,但不会有并发优势。
-
@kaylum 这是实际代码的淡化版本:在其中,每个线程读取缓冲区并保存它自己的副本,以便它可以在其他线程并行的同时继续处理它对他们自己的消息做同样的事情。我想在线程创建自己的副本所需的时间内保护消息缓冲区,但我想知道最好的方法是什么。
-
在这种情况下,您可以使用互斥锁。见pthread_muex_lock。根据您的实际应用程序,您可能会发现对缓冲区的争用可能会成为瓶颈。对于高性能应用程序,您的目标是尽量减少缓冲区的复制。
-
与其调用
mq_receive然后创建一个线程,不如反过来做:创建调用mq_receive的线程。整个问题都消失了。 (在实践中可以看到here。它使用了自定义队列模块,但概念是一样的。) -
@bool3max 您使用自动调整或仅计算线程数的线程池。几乎可以肯定,您很快就不需要担心,只需创建一些额外的线程即可。
标签: c multithreading pthreads race-condition