【发布时间】:2021-03-09 19:17:03
【问题描述】:
我有一个多线程程序,它的 2 个线程通过消息队列相互通信。第一个线程(发送者)定期发送消息,而第二个线程(接收者)处理信息。
发件人有类似这样的代码:
// Create queue
key_t key = ftok("/tmp", 'B');
int msqid = msgget(key, 0664 | IPC_CREAT);
// Create message and send
struct request_msg req_msg;
req_msg.mtype = 1;
snprintf(req_msg.mtext, MSG_LENGTH, "Send this information");
msgsnd(msqid, &req_msg, strlen(req_msg.mtext) + 1, 0);
在接收线程上,我这样做:
// Subscribe to queue
key_t key = ftok("/tmp", 'B');
int msqid = msgget(key, 0664);
struct request_msg req_msg;
while(running)
{
msgrcv(msqid, &req_msg, sizeof(req_msg.mtext), 0, 0);
// Do sth with the message
}
如您所见,接收器位于一个由名为“running”的全局变量控制的 while 循环中。如果在进程中遇到错误,错误处理程序会将布尔值设置为 false。这在大多数情况下都有效,但如果在能够向队列发送消息之前发生错误,接收方将不会退出 while 循环,因为它会在继续之前等待消息,从而检查运行变量。这意味着它将永远挂在那里,因为发送者在运行时的其余时间不会发送任何东西。
我想避免这种情况,但我不知道如何让 msgrcv 知道它不能期待更多消息。如果我终止队列,我无法了解 msgrcv 的行为方式,假设这是最简单的版本。也许超时或发送某种终止消息(可能使用消息结构的 mtype 成员)也是可能的。
请让我知道解决这个问题最可靠的方法是什么。谢谢!
编辑:根据建议,我重新编写了代码以使信号处理程序动作原子化。
#include <stdbool.h> // bool data type
#include <stdio.h>
#include <signal.h>
#include <stdint.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#define ALARM_INTERVAL_SEC 1
#define ALARM_INTERVAL_USEC 0
struct message
{
uint64_t iteration;
char req_time[28];
};
static volatile bool running = true;
static volatile bool work = false;
static struct itimerval alarm_interval;
static struct timeval previous_time;
static uint64_t loop_count = 0;
static struct message msg;
pthread_mutex_t mutexmsg;
pthread_cond_t data_updated_cv;
static void
termination_handler(int signum)
{
running = false;
}
static void
alarm_handler(int signum)
{
work = true;
}
static void
write_msg(void)
{
// Reset the alarm interval
if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
{
perror("setitimer");
raise(SIGTERM);
return;
}
struct timeval current_time;
gettimeofday(¤t_time, NULL);
printf("\nLoop count: %lu\n", loop_count);
printf("Loop time: %f us\n", (current_time.tv_sec - previous_time.tv_sec) * 1e6 +
(current_time.tv_usec - previous_time.tv_usec));
previous_time = current_time;
// format timeval struct
char tmbuf[64];
time_t nowtime = current_time.tv_sec;
struct tm *nowtm = localtime(&nowtime);
strftime(tmbuf, sizeof(tmbuf), "%Y-%m-%d %H:%M:%S", nowtm);
// write values
pthread_mutex_lock(&mutexmsg);
msg.iteration = loop_count;
snprintf(msg.req_time, sizeof(msg.req_time), "%s.%06ld", tmbuf, current_time.tv_usec);
pthread_cond_signal(&data_updated_cv);
pthread_mutex_unlock(&mutexmsg);
loop_count++;
}
static void*
process_msg(void *args)
{
while(1)
{
pthread_mutex_lock(&mutexmsg);
printf("Waiting for condition\n");
pthread_cond_wait(&data_updated_cv, &mutexmsg);
printf("Condition fulfilled\n");
if(!running)
{
break;
}
struct timeval process_time;
gettimeofday(&process_time, NULL);
char tmbuf[64];
char buf[64];
time_t nowtime = process_time.tv_sec;
struct tm *nowtm = localtime(&nowtime);
strftime(tmbuf, sizeof(tmbuf), "%Y-%m-%d %H:%M:%S", nowtm);
snprintf(buf, sizeof(buf), "%s.%06ld", tmbuf, process_time.tv_usec);
// something that takes longer than the interval time
// sleep(1);
printf("[%s] Req time: %s loop cnt: %lu\n", buf, msg.req_time, msg.iteration);
pthread_mutex_unlock(&mutexmsg);
}
pthread_exit(NULL);
}
int
main(int argc, char* argv[])
{
pthread_t thread_id;
pthread_attr_t attr;
// for portability, set thread explicitly as joinable
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
if(pthread_create(&thread_id, NULL, process_msg, NULL) != 0)
{
perror("pthread_create");
exit(1);
}
pthread_attr_destroy(&attr);
// signal handling setup
struct sigaction t;
t.sa_handler = termination_handler;
sigemptyset(&t.sa_mask);
t.sa_flags = 0;
sigaction(SIGINT, &t, NULL);
sigaction(SIGTERM, &t, NULL);
struct sigaction a;
a.sa_handler = alarm_handler;
sigemptyset(&a.sa_mask);
a.sa_flags = 0;
sigaction(SIGALRM, &a, NULL);
// Set the alarm interval
alarm_interval.it_interval.tv_sec = 0;
alarm_interval.it_interval.tv_usec = 0;
alarm_interval.it_value.tv_sec = ALARM_INTERVAL_SEC;
alarm_interval.it_value.tv_usec = ALARM_INTERVAL_USEC;
if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
{
perror("setitimer");
exit(1);
}
gettimeofday(&previous_time, NULL);
while(1)
{
// suspending main thread until a signal is caught
pause();
if(!running)
{
// signal the worker thread to stop execution
pthread_mutex_lock(&mutexmsg);
pthread_cond_signal(&data_updated_cv);
pthread_mutex_unlock(&mutexmsg);
break;
}
if(work)
{
write_msg();
work = false;
}
}
// suspend thread until the worker thread joins back in
pthread_join(thread_id, NULL);
// reset the timer
alarm_interval.it_value.tv_sec = 0;
alarm_interval.it_value.tv_usec = 0;
if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
{
perror("setitimer");
exit(1);
}
printf("EXIT\n");
pthread_exit(NULL);
}
【问题讨论】:
-
如果没有待处理的消息,您可以告诉
msgrcv()立即返回错误消息。请参阅手册页。 -
而且较新的 POSIX 消息队列支持超时等待消息。
-
@Shawn 感谢您的意见。如果没有消息,我不希望它立即返回。原因是发送者以一秒的间隔写入消息队列,而接收者必须比这更快。所以等到下一条消息到达是期望的行为。不过,我肯定会研究 POSIX 消息队列。
标签: c ipc message-queue