【问题标题】:Prevent msgrcv from waiting indefinitely防止 msgrcv 无限期等待
【发布时间】:2021-03-09 19:17:03
【问题描述】:

我有一个多线程程序,它的 2 个线程通过消息队列相互通信。第一个线程(发送者)定期发送消息,而第二个线程(接收者)处理信息。

发件人有类似这样的代码:

// Create queue
key_t key = ftok("/tmp", 'B');
int msqid = msgget(key, 0664 | IPC_CREAT);

// Create message and send
struct request_msg req_msg;
req_msg.mtype = 1;
snprintf(req_msg.mtext, MSG_LENGTH, "Send this information");
msgsnd(msqid, &req_msg, strlen(req_msg.mtext) + 1, 0);

在接收线程上,我这样做:

// Subscribe to queue
key_t key = ftok("/tmp", 'B');
int msqid = msgget(key, 0664);

struct request_msg req_msg;

while(running)
{
    msgrcv(msqid, &req_msg, sizeof(req_msg.mtext), 0, 0);
    // Do sth with the message
}

如您所见,接收器位于一个由名为“running”的全局变量控制的 while 循环中。如果在进程中遇到错误,错误处理程序会将布尔值设置为 false。这在大多数情况下都有效,但如果在能够向队列发送消息之前发生错误,接收方将不会退出 while 循环,因为它会在继续之前等待消息,从而检查运行变量。这意味着它将永远挂在那里,因为发送者在运行时的其余时间不会发送任何东西。

我想避免这种情况,但我不知道如何让 msgrcv 知道它不能期待更多消息。如果我终止队列,我无法了解 msgrcv 的行为方式,假设这是最简单的版本。也许超时或发送某种终止消息(可能使用消息结构的 mtype 成员)也是可能的。

请让我知道解决这个问题最可靠的方法是什么。谢谢!

编辑:根据建议,我重新编写了代码以使信号处理程序动作原子化。

#include <stdbool.h> // bool data type
#include <stdio.h>
#include <signal.h>
#include <stdint.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>


#define ALARM_INTERVAL_SEC 1
#define ALARM_INTERVAL_USEC 0

struct message 
{
    uint64_t iteration;
    char req_time[28];
};

static volatile bool running = true;
static volatile bool work = false;
static struct itimerval alarm_interval;
static struct timeval previous_time;
static uint64_t loop_count = 0;
static struct message msg;

pthread_mutex_t mutexmsg;
pthread_cond_t data_updated_cv;


static void
termination_handler(int signum)
{
    running = false;
}


static void 
alarm_handler(int signum)
{
    work = true;
}


static void
write_msg(void)
{
    // Reset the alarm interval
    if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
    {
        perror("setitimer");
        raise(SIGTERM);
        return;
    }

    struct timeval current_time;
    gettimeofday(&current_time, NULL);
    printf("\nLoop count: %lu\n", loop_count);
    printf("Loop time: %f us\n", (current_time.tv_sec - previous_time.tv_sec) * 1e6 +
                           (current_time.tv_usec - previous_time.tv_usec));
    previous_time = current_time;

    // format timeval struct
    char tmbuf[64];
    time_t nowtime = current_time.tv_sec;
    struct tm *nowtm = localtime(&nowtime);
    strftime(tmbuf, sizeof(tmbuf), "%Y-%m-%d %H:%M:%S", nowtm);

    // write values
    pthread_mutex_lock(&mutexmsg);
    msg.iteration = loop_count;
    snprintf(msg.req_time, sizeof(msg.req_time), "%s.%06ld", tmbuf, current_time.tv_usec);
    pthread_cond_signal(&data_updated_cv);
    pthread_mutex_unlock(&mutexmsg);

    loop_count++;
}


static void* 
process_msg(void *args)
{
    while(1)
    {
        pthread_mutex_lock(&mutexmsg);

        printf("Waiting for condition\n");
        pthread_cond_wait(&data_updated_cv, &mutexmsg);
        printf("Condition fulfilled\n");

        if(!running)
        {
            break;
        }

        struct timeval process_time;
        gettimeofday(&process_time, NULL);

        char tmbuf[64];
        char buf[64];
        time_t nowtime = process_time.tv_sec;
        struct tm *nowtm = localtime(&nowtime);
        strftime(tmbuf, sizeof(tmbuf), "%Y-%m-%d %H:%M:%S", nowtm);
        snprintf(buf, sizeof(buf), "%s.%06ld", tmbuf, process_time.tv_usec);

        // something that takes longer than the interval time
        // sleep(1);

        printf("[%s] Req time: %s loop cnt: %lu\n", buf, msg.req_time, msg.iteration);
        pthread_mutex_unlock(&mutexmsg);

    }

    pthread_exit(NULL);
}



int
main(int argc, char* argv[])
{
    pthread_t thread_id;
    pthread_attr_t attr;

    // for portability, set thread explicitly as joinable
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

    if(pthread_create(&thread_id, NULL, process_msg, NULL) != 0)
    {
        perror("pthread_create");
        exit(1);
    }

    pthread_attr_destroy(&attr);

    // signal handling setup
    struct sigaction t;
    t.sa_handler = termination_handler;
    sigemptyset(&t.sa_mask);
    t.sa_flags = 0;
    sigaction(SIGINT, &t, NULL);
    sigaction(SIGTERM, &t, NULL);

    struct sigaction a;
    a.sa_handler = alarm_handler;
    sigemptyset(&a.sa_mask);
    a.sa_flags = 0;
    sigaction(SIGALRM, &a, NULL);
    
    // Set the alarm interval
    alarm_interval.it_interval.tv_sec = 0;
    alarm_interval.it_interval.tv_usec = 0;
    alarm_interval.it_value.tv_sec = ALARM_INTERVAL_SEC;
    alarm_interval.it_value.tv_usec = ALARM_INTERVAL_USEC;

    if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
    {
        perror("setitimer");
        exit(1);
    }

    gettimeofday(&previous_time, NULL);

    while(1)
    {
        // suspending main thread until a signal is caught
        pause();

        if(!running)
        {
            // signal the worker thread to stop execution
            pthread_mutex_lock(&mutexmsg);
            pthread_cond_signal(&data_updated_cv);
            pthread_mutex_unlock(&mutexmsg);

            break;
        }

        if(work)
        {
            write_msg();
            work = false;
        }
    }

    // suspend thread until the worker thread joins back in
    pthread_join(thread_id, NULL);

    // reset the timer
    alarm_interval.it_value.tv_sec = 0;
    alarm_interval.it_value.tv_usec = 0;
    if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
    {
        perror("setitimer");
        exit(1);
    }

    printf("EXIT\n");
    pthread_exit(NULL);
    
}

【问题讨论】:

  • 如果没有待处理的消息,您可以告诉msgrcv() 立即返回错误消息。请参阅手册页。
  • 而且较新的 POSIX 消息队列支持超时等待消息。
  • @Shawn 感谢您的意见。如果没有消息,我不希望它立即返回。原因是发送者以一秒的间隔写入消息队列,而接收者必须比这更快。所以等到下一条消息到达是期望的行为。不过,我肯定会研究 POSIX 消息队列。

标签: c ipc message-queue


【解决方案1】:

回答问题中的新提案

  • 应在主线程的事件循环中重新配置循环计时器以获得更好的可见性(主观建议);
  • 当辅助线程退出其循环时,它必须释放 mutex 否则主线程将进入死锁(等待 对于被终止的辅助线程锁定的互斥锁)。

因此,这是具有上述修复/增强功能的最后一个提案:

#include <stdbool.h> // bool data type
#include <stdio.h>
#include <signal.h>
#include <stdint.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/time.h>


#define ALARM_INTERVAL_SEC 1
#define ALARM_INTERVAL_USEC 0

struct message 
{
    uint64_t iteration;
    char req_time[28];
};

static volatile bool running = true;
static volatile bool work = false;
static struct itimerval alarm_interval;
static struct timeval previous_time;
static uint64_t loop_count = 0;
static struct message msg;

pthread_mutex_t mutexmsg;
pthread_cond_t data_updated_cv;


static void
termination_handler(int signum)
{
    running = false;
}


static void 
alarm_handler(int signum)
{
    work = true;
}


static void
write_msg(void)
{

    struct timeval current_time;
    gettimeofday(&current_time, NULL);
    printf("\nLoop count: %lu\n", loop_count);
    printf("Loop time: %f us\n", (current_time.tv_sec - previous_time.tv_sec) * 1e6 +
                           (current_time.tv_usec - previous_time.tv_usec));
    previous_time = current_time;

    // format timeval struct
    char tmbuf[64];
    time_t nowtime = current_time.tv_sec;
    struct tm *nowtm = localtime(&nowtime);
    strftime(tmbuf, sizeof(tmbuf), "%Y-%m-%d %H:%M:%S", nowtm);

    // write values
    pthread_mutex_lock(&mutexmsg);
    msg.iteration = loop_count;
    snprintf(msg.req_time, sizeof(msg.req_time), "%s.%06ld", tmbuf, current_time.tv_usec);
    pthread_cond_signal(&data_updated_cv);
    pthread_mutex_unlock(&mutexmsg);

    loop_count++;
}


static void* 
process_msg(void *args)
{
    while(1)
    {
        pthread_mutex_lock(&mutexmsg);

        printf("Waiting for condition\n");
        pthread_cond_wait(&data_updated_cv, &mutexmsg);
        printf("Condition fulfilled\n");

        if(!running)
        {
            pthread_mutex_unlock(&mutexmsg); // <----- To avoid deadlock
            break;
        }

        struct timeval process_time;
        gettimeofday(&process_time, NULL);

        char tmbuf[64];
        char buf[64];
        time_t nowtime = process_time.tv_sec;
        struct tm *nowtm = localtime(&nowtime);
        strftime(tmbuf, sizeof(tmbuf), "%Y-%m-%d %H:%M:%S", nowtm);
        snprintf(buf, sizeof(buf), "%s.%06ld", tmbuf, process_time.tv_usec);

        // something that takes longer than the interval time
        //sleep(2);

        printf("[%s] Req time: %s loop cnt: %lu\n", buf, msg.req_time, msg.iteration);
        pthread_mutex_unlock(&mutexmsg);

    }

    printf("Thread exiting...\n");
    pthread_exit(NULL);
}



int
main(int argc, char* argv[])
{
    pthread_t thread_id;
    pthread_attr_t attr;

    // for portability, set thread explicitly as joinable
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

    if(pthread_create(&thread_id, NULL, process_msg, NULL) != 0)
    {
        perror("pthread_create");
        exit(1);
    }

    pthread_attr_destroy(&attr);

    // signal handling setup
    struct sigaction t;
    t.sa_handler = termination_handler;
    sigemptyset(&t.sa_mask);
    t.sa_flags = 0;
    sigaction(SIGINT, &t, NULL);
    sigaction(SIGTERM, &t, NULL);

    struct sigaction a;
    a.sa_handler = alarm_handler;
    sigemptyset(&a.sa_mask);
    a.sa_flags = 0;
    sigaction(SIGALRM, &a, NULL);
    
    // Set the alarm interval
    alarm_interval.it_interval.tv_sec = 0;
    alarm_interval.it_interval.tv_usec = 0;
    alarm_interval.it_value.tv_sec = ALARM_INTERVAL_SEC;
    alarm_interval.it_value.tv_usec = ALARM_INTERVAL_USEC;

    if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
    {
        perror("setitimer");
        exit(1);
    }

    gettimeofday(&previous_time, NULL);

    while(1)
    {
        // Reset the alarm interval <-------- Rearm the timer in the main loop
        if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
        {
          perror("setitimer");
          raise(SIGTERM);
          break;
        }

        // suspending main thread until a signal is caught
        pause();

        if(!running)
        {
            // signal the worker thread to stop execution
            pthread_mutex_lock(&mutexmsg);
            pthread_cond_signal(&data_updated_cv);
            pthread_mutex_unlock(&mutexmsg);

            break;
        }

        if(work)
        {
            write_msg();
            work = false;
        }
    }

    // suspend thread until the worker thread joins back in
    pthread_join(thread_id, NULL);

    // reset the timer
    alarm_interval.it_value.tv_sec = 0;
    alarm_interval.it_value.tv_usec = 0;
    if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
    {
        perror("setitimer");
        exit(1);
    }

    printf("EXIT\n");
    pthread_exit(NULL);
    
}

================================================ ====================

回答原问题

可以使用conditional variable 来等待来自发送者的信号。这使接收方唤醒并通过在 msgrcv() 的标志参数中传递 IPC_NOWAIT 来检查消息队列中的消息。要结束通信,可以张贴“结束通信”消息。也可以使用pthread_con_timedwait() 定期唤醒并检查“通信结束”或“接收条件结束”(例如,通过检查您的全局“运行”变量)。

接收方:

// Mutex initialization
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

// Condition variable initialization
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
[...]
while (1) {
  // Lock the mutex
  pthread_mutex_lock(&mutex);

  // Check for messages (non blocking thanks to IPC_NOWAIT)
  rc = msgrcv(msqid, &req_msg, sizeof(req_msg.mtext), 0, IPC_NOWAIT);
  if (rc == -1) {
    if (errno == ENOMSG) {

      // message queue empty

      // Wait for message notification
      pthread_cond_wait(&cond, &mutex); // <--- pthread_cond_timedwait() can be used to wake up and check for the end of communication or senders...

    } else {
      // Error
    }
  }

  // Handle the message, end of communication (e.g. "running" variable)...

  // Release the lock (so that the sender can post something in the queue)
  pthread_mutex_unlock(&mutex);
}

发送方:

// Prepare the message
[...]
// Take the lock
pthread_mutex_lock(&mutex);

// Send the message
msgsnd(msqid, &req_msg, strlen(req_msg.mtext) + 1, 0);

// Wake up the receiver
pthread_cond_signal(&cond);

// Release the lock
pthread_mutex_unlock(&mutex);

注意SYSV message queues 已过时。最好使用全新的Posix services

【讨论】:

  • 感谢您的回复。我研究了 POSIX 实现,发现有一个名为 mq_timedreceive 的函数,如果我理解正确的话,它的作用与您提出的 pthread_con_timedwait 解决方案非常相似。这两种解决方案哪个更可取?
  • 所有解决方案都OK。这取决于您的需求(您想要停止/检测 z 线程终止的方式)。尝试找到产生尽可能少源代码的那个:-)
  • 基于定时器的解决方案通常会触发一些轮询(循环唤醒以进行多次检查)。这也可能不太被动,因为您需要在执行某些操作之前等待计时器过去。基于信令的解决方案更具反应性。只要您想向接收者通知某个事件,信号就会将其唤醒。
【解决方案2】:

在接收线程上,我这样做:

...

while(running)
{
    msgrcv(msqid, &req_msg, sizeof(req_msg.mtext), 0, 0);

希望在现实中你做的比这更多

因为您没有检查您发布的代码中的任何错误状态。这对于一个可能被指定为从不在接收到信号时重新启动的阻塞函数调用是完全错误的(在 Linux 和 Solaris 上也是如此)。每Linux `signal(2)

以下接口被中断后永不重启 通过信号处理程序,无论使用SA_RESTART;他们 当被信号中断时,总是失败并出现错误EINTR 处理程序:

  • ...
  • System V IPC 接口:msgrcv(2)msgsnd(2)semop(2)semtimedop(2)

Solaris sigaction():

SA_RESTART

如果设置并且信号被捕获,则被该信号的处理程序执行中断的函数将被系统透明地重新启动,即fcntl(2)ioctl(2)wait(3C)waitid(2),以及以下函数在终端等慢速设备上:getmsg()getpmsg()(参见 getmsg(2)); putmsg()putpmsg()(见 putmsg(2)); pread()read()readv()(见 read(2)); pwrite()write()writev()(见 write(2)); recv()recvfrom()recvmsg()(见recv(3SOCKET));和send()sendto()sendmsg()(参见send(3SOCKET))。否则,该函数将返回 EINTR 错误。

因此,您的代码需要看起来更像这样才能同时处理错误和信号中断:

volatile sig_atomic_t running;

...

while(running)
{
    errno = 0;
    ssize_t result = msgrcv(msqid, &req_msg, sizeof(req_msg.mtext), 0, 0);
    if ( result == ( ssize_t ) -1 )
    {
        // if the call failed or no longer running
        // break the loop
        if ( ( errno != EINTR ) || !running )
        {
            break;
        }

        // the call was interrupted by a signal
        continue
    }

    ...
}

提供了使用alarm()SIGALRM 信号处理程序将running 设置为0 以用作超时的机会:

volatile sig_atomic_t running;

void handler( int sig );
{
    running = 0;
}
...

struct sigaction sa;
memset( &sa, 0, sizeof( sa ) );
sa.sa_handler = handler;

sigaction( SIGALRM, &sa, NULL );
while(running)
{
    // 10-sec timeout
    alarm( 10 );

    errno = 0;
    ssize_t result = msgrcv( msqid, &req_msg, sizeof(req_msg.mtext), 0, 0 );
    
    // save errno as alarm() can munge it
    int saved_errno = errno;

    // clear alarm if it hasn't fired yet
    alarm( 0 );

    if ( result == ( ssize_t ) -1 )
    {
        // if the call failed or no longer running
        // break the loop
        if ( ( saved_errno != EINTR ) || !running )
        {
            break;
        }

        // the call was interrupted by a signal
        continue
    }

    ...
}

这几乎肯定可以改进 - 捕获所有极端情况的逻辑相当复杂,并且可能有更简单的方法来做到这一点。

【讨论】:

    【解决方案3】:

    我花了最后一天阅读了很多关于线程和互斥锁的内容,并试图让我的示例程序工作。确实如此,但不幸的是,当我尝试通过 Ctrl+C 关闭它时,它卡住了。原因是(再次)这一次,工作线程等待来自不再发送信号的主线程的信号。

    @Rachid K. 和@Unslander Monica:如果你想再看一遍,这是不是更先进的代码?另外,我认为我必须使用pthread_cond_timedwait 而不是pthread_cond_wait 以避免终止死锁。你能告诉我具体怎么处理吗?

    请注意,程序只是周期性地(间隔 1 秒)将时间戳和循环计数器传递给处理线程,以打印出数据。输出还会显示调用 print 的时间。

    再次感谢!

    #include <stdbool.h> // bool data type
    #include <stdio.h>
    #include <signal.h>
    #include <stdint.h>
    #include <pthread.h>
    #include <stdlib.h>
    
    
    #define ALARM_INTERVAL_SEC 1
    #define ALARM_INTERVAL_USEC 0
    
    
    static bool running = true;
    static struct itimerval alarm_interval;
    static struct timeval previous_time;
    static uint64_t loop_count = 0;
    
    struct message 
    {
        uint64_t iteration;
        char req_time[28];
    } msg;
    
    pthread_mutex_t mutexmsg;
    pthread_cond_t data_updated_cv;
    
    
    static void
    signal_handler(int signum)
    {
        if (signum == SIGINT || signum == SIGTERM) 
        {
            running = false;
        }
    }
    
    
    static void
    write_msg(int signum)
    {
        if(!running)
        {
            return;
        }
    
        // Reset the alarm interval
        if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
        {
            perror("setitimer");
            raise(SIGTERM);
            return;
        }
    
        struct timeval current_time;
        gettimeofday(&current_time, NULL);
        printf("\nLoop count: %lu\n", loop_count);
        printf("Loop time: %f us\n", (current_time.tv_sec - previous_time.tv_sec) * 1e6 +
                               (current_time.tv_usec - previous_time.tv_usec));
        previous_time = current_time;
    
        // format timeval struct
        char tmbuf[64];
        time_t nowtime = current_time.tv_sec;
        struct tm *nowtm = localtime(&nowtime);
        strftime(tmbuf, sizeof(tmbuf), "%Y-%m-%d %H:%M:%S", nowtm);
    
        // write values
        pthread_mutex_lock(&mutexmsg);
        msg.iteration = loop_count;
        snprintf(msg.req_time, sizeof(msg.req_time), "%s.%06ld", tmbuf, current_time.tv_usec);
        pthread_cond_signal(&data_updated_cv);
        pthread_mutex_unlock(&mutexmsg);
    
        loop_count++;
    }
    
    
    static void* 
    process_msg(void *args)
    {
        while(running)
        {
            pthread_mutex_lock(&mutexmsg);
    
            printf("Waiting for condition\n");
            pthread_cond_wait(&data_updated_cv, &mutexmsg);
            printf("Condition fulfilled\n");
            struct timeval process_time;
            gettimeofday(&process_time, NULL);
    
            char tmbuf[64];
            char buf[64];
            time_t nowtime = process_time.tv_sec;
            struct tm *nowtm = localtime(&nowtime);
            strftime(tmbuf, sizeof(tmbuf), "%Y-%m-%d %H:%M:%S", nowtm);
            snprintf(buf, sizeof(buf), "%s.%06ld", tmbuf, process_time.tv_usec);
    
            printf("[%s] Message req time: %s loop cnt: %lu\n", buf, msg.req_time, msg.iteration);
            pthread_mutex_unlock(&mutexmsg);
    
        }
    
        pthread_exit(NULL);
    }
    
    
    int
    main(int argc, char* argv[])
    {
        pthread_t thread_id;
        pthread_attr_t attr;
    
        // for portability, set thread explicitly as joinable
        pthread_attr_init(&attr);
        pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
    
        if(pthread_create(&thread_id, NULL, process_msg, NULL) != 0)
        {
            perror("pthread_create");
            exit(1);
        }
    
        pthread_attr_destroy(&attr);
    
        // signal handling setup
        struct sigaction s;
        s.sa_handler = signal_handler;
        sigemptyset(&s.sa_mask);
        s.sa_flags = 0;
        sigaction(SIGINT, &s, NULL);
        sigaction(SIGTERM, &s, NULL);
    
        struct sigaction a;
        a.sa_handler = write_msg;
        sigemptyset(&a.sa_mask);
        a.sa_flags = 0;
        sigaction(SIGALRM, &a, NULL);
        
        // Set the alarm interval
        alarm_interval.it_interval.tv_sec = 0;
        alarm_interval.it_interval.tv_usec = 0;
        alarm_interval.it_value.tv_sec = ALARM_INTERVAL_SEC;
        alarm_interval.it_value.tv_usec = ALARM_INTERVAL_USEC;
    
        if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
        {
            perror("setitimer");
            exit(1);
        }
    
        gettimeofday(&previous_time, NULL);
    
        // suspend thread until the worker thread joins back in
        pthread_join(thread_id, NULL);
    
        // reset the timer
        alarm_interval.it_value.tv_sec = 0;
        alarm_interval.it_value.tv_usec = 0;
        if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
        {
            perror("setitimer");
            exit(1);
        }
    
        pthread_exit(NULL);
        return 0;
    }
    

    【讨论】:

    • 当 CTRL-C 的信号处理程序被触发时,线程可能正在等待 pthread_cond_wait() ,因此您会卡住。只需添加一个:“pthread_mutex_lock(&mutexmsg); pthread_cond_signal(&data_updated_cv); pthread_mutex_unlock(&mutexmsg);”在 SIGINT 的信号处理程序中,在 "running" 设置为 false 之后。
    • 为了使其更安全并避免一些编译器优化将全局“运行”放入线程内的寄存器中,您应该将其定义为:static volatile bool running = true;
    • 您可能还需要在 pthread_cond_wait() 之后检查“正在运行”以查看唤醒是计时器还是 SIGINT/SIGTERM。
    • @Rachid K. 这是有道理的。但是将这样复杂的多行代码放入信号处理程序中不是很危险吗?有人告诉我,那里只有像设置变量这样的原子操作是可以接受的,以防触发另一个信号,阻止信号处理程序完成其任务。
    • 您在 SIGALRM 处理程序中执行相同的操作。通常,最好将事件集中在一个事件循环中(例如在主程序中),这将向线程发出信号。例如,它的主程序执行:while (1) { pause(); // 使信号对应的动作 },每次pause()返回,就表示接收到一个信号,然后pthread_cond_signal()可以被调用或者运行从那里设置为false。
    【解决方案4】:

    除了作为同步原语之外,您还没有证明使用消息队列是合理的。您可以通过变量和原子标志传递消息以指示消息准备就绪。 This answer 然后描述了如何使用条件变量实现线程暂停和恢复。这就是它通常在线程之间完成的方式,尽管当然不是唯一的方式。

    我不知道如何让 msgrcv 知道它不能期待更多消息

    没必要。只需发送一条消息,告诉线程完成! running 变量不属于:您正在尝试与其他线程通信,所以按照您选择的方式进行操作:向它发送消息!

    【讨论】:

    • 感谢您的回复。您能否详细说明最后一部分,即全局运行变量应该被终止消息替换?如果我理解正确,则应将一条特殊的错误消息发送到队列,以防任一线程出现错误。接收器中的 while 循环应该始终为真,如果我收到错误消息,我会中断它。对吗?
    • 另外,另一个答案指出使用消息队列的 POSIX 实现,而不是提供定时接收功能。这是否比结合使用 pthread 和原子标志更优雅?
    猜你喜欢
    • 2011-11-24
    • 2012-05-12
    • 2011-10-23
    • 1970-01-01
    • 1970-01-01
    • 2022-08-10
    • 2012-12-18
    • 1970-01-01
    • 2014-03-23
    相关资源
    最近更新 更多