【发布时间】:2016-04-22 23:17:28
【问题描述】:
我正在尝试使用两个进程方案来实现生产者-消费者问题。
Process1 - 生产者,Process2 - 消费者。消费者进程正在等待条件变量 (pthread_cond_wait(cond)),生产者将通过 pthread_cond_signal(cond) 向消费者发送信号。
我浏览了这些链接shared mutex and condition variable across processpthread_mutexattr_setpshared,到处都说可以跨多个进程使用互斥锁和条件变量,
pthread_mutexattr_setpshared(&mutexAttr, PTHREAD_PROCESS_SHARED);
pthread_condattr_setpshared(&condAttr, PTHREAD_PROCESS_SHARED);
在这个链接shared mutexes,按照void的建议,据说检查我的系统是否支持?我检查了一下,我得到了 200809 作为sysconf(_SC_THREAD_PROCESS_SHARED) 的返回值,这表示我的系统支持PTHREAD_PROCESS_SHARED。
我正在尝试将pthread_cond_signal 从生产者 (process-1) 发送到消费者 (Process-2)。 生产者和消费者都使用相同的已初始化互斥体/条件变量。
但是,消费者没有收到信号。似乎信号未发送或丢失。
我哪里出错了?我正在使用 Ubuntu,gcc-4.6.3。
这是我的代码。
生产者.c:
#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/wait.h>
#include <pthread.h>
#include <sched.h>
#include <syscall.h>
#include <sys/stat.h>
#include <errno.h>
#include <sys/mman.h>
#include <fcntl.h>
#include <stdbool.h>
pthread_cond_t* condition;
pthread_mutex_t* mutex;
#define OKTOWRITE "/oktowrite"
#define MESSAGE "/message"
#define MUTEX "/lock"
struct shared_use_st
{
bool conditionSatisfied;
};
struct shared_use_st *shared_stuff;
void create_shared_memory()
{
int shmid;
void *shared_memory=(void *)0;
shmid =shmget( (key_t)1234, 4096, 0666 | IPC_CREAT );
if (shmid == -1)
{
fprintf(stderr,"shmget failed\n");
exit(EXIT_FAILURE);
}
shared_memory =shmat(shmid, (void *)0,0);
if(shared_memory == (void *)-1)
{
fprintf(stderr,"shmat failed\n");
exit(EXIT_FAILURE);
}
shared_stuff = (struct shared_use_st *)shared_memory;
}
int main()
{
int des_cond, des_msg, des_mutex;
int mode = S_IRWXU | S_IRWXG;
des_mutex = shm_open(MUTEX, O_CREAT | O_RDWR | O_TRUNC, mode);
if (des_mutex < 0) {
perror("failure on shm_open on des_mutex");
exit(1);
}
if (ftruncate(des_mutex, sizeof(pthread_mutex_t)) == -1) {
perror("Error on ftruncate to sizeof pthread_cond_t\n");
exit(-1);
}
mutex = (pthread_mutex_t*) mmap(NULL, sizeof(pthread_mutex_t),PROT_READ | PROT_WRITE, MAP_SHARED, des_mutex, 0);
if (mutex == MAP_FAILED ) {
perror("Error on mmap on mutex\n");
exit(1);
}
des_cond = shm_open(OKTOWRITE, O_CREAT | O_RDWR | O_TRUNC, mode);
if (des_cond < 0) {
perror("failure on shm_open on des_cond");
exit(1);
}
if (ftruncate(des_cond, sizeof(pthread_cond_t)) == -1) {
perror("Error on ftruncate to sizeof pthread_cond_t\n");
exit(-1);
}
condition = (pthread_cond_t*) mmap(NULL, sizeof(pthread_cond_t),PROT_READ | PROT_WRITE, MAP_SHARED, des_cond, 0);
if (condition == MAP_FAILED ) {
perror("Error on mmap on condition\n");
exit(1);
}
/* set mutex shared between processes */
pthread_mutexattr_t mutexAttr;
pthread_mutexattr_init(&mutexAttr);
pthread_mutexattr_setpshared(&mutexAttr, PTHREAD_PROCESS_SHARED);
pthread_mutex_init(mutex, &mutexAttr);
/* set condition shared between processes */
pthread_condattr_t condAttr;
pthread_condattr_init(&condAttr);
pthread_condattr_setpshared(&condAttr, PTHREAD_PROCESS_SHARED);
pthread_cond_init(condition, &condAttr);
create_shared_memory();
shared_stuff->conditionSatisfied=0;
int count=0;
while(count++<10)
{
pthread_mutex_lock(mutex);
shared_stuff->conditionSatisfied=1;
pthread_mutex_unlock(mutex);
pthread_cond_signal(condition);
printf("signal sent to consumer, %d\n",count);
sleep(3);
}
pthread_condattr_destroy(&condAttr);
pthread_mutexattr_destroy(&mutexAttr);
pthread_mutex_destroy(mutex);
pthread_cond_destroy(condition);
shm_unlink(OKTOWRITE);
shm_unlink(MESSAGE);
shm_unlink(MUTEX);
return 0;
}
消费者.c:
#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/wait.h>
#include <pthread.h>
#include <sched.h>
#include <syscall.h>
#include <sys/stat.h>
#include <errno.h>
#include <sys/mman.h>
#include <fcntl.h>
#include <stdbool.h>
pthread_cond_t* condition;
pthread_mutex_t* mutex;
#define OKTOWRITE "/oktowrite"
#define MESSAGE "/message"
#define MUTEX "/lock"
struct shared_use_st
{
bool conditionSatisfied;
};
struct shared_use_st *shared_stuff;
void create_shared_memory()
{
int shmid;
void *shared_memory=(void *)0;
shmid =shmget( (key_t)1234, 4096, 0666 | IPC_CREAT );
if (shmid == -1)
{
fprintf(stderr,"shmget failed\n");
exit(EXIT_FAILURE);
}
shared_memory =shmat(shmid, (void *)0,0);
if(shared_memory == (void *)-1)
{
fprintf(stderr,"shmat failed\n");
exit(EXIT_FAILURE);
}
shared_stuff = (struct shared_use_st *)shared_memory;
}
int main()
{
int des_cond, des_msg, des_mutex;
int mode = S_IRWXU | S_IRWXG;
des_mutex = shm_open(MUTEX, O_CREAT | O_RDWR | O_TRUNC, mode);
if (des_mutex < 0) {
perror("failure on shm_open on des_mutex");
exit(1);
}
if (ftruncate(des_mutex, sizeof(pthread_mutex_t)) == -1) {
perror("Error on ftruncate to sizeof pthread_cond_t\n");
exit(-1);
}
mutex = (pthread_mutex_t*) mmap(NULL, sizeof(pthread_mutex_t),PROT_READ | PROT_WRITE, MAP_SHARED, des_mutex, 0);
if (mutex == MAP_FAILED ) {
perror("Error on mmap on mutex\n");
exit(1);
}
des_cond = shm_open(OKTOWRITE, O_CREAT | O_RDWR | O_TRUNC, mode);
if (des_cond < 0) {
perror("failure on shm_open on des_cond");
exit(1);
}
if (ftruncate(des_cond, sizeof(pthread_cond_t)) == -1) {
perror("Error on ftruncate to sizeof pthread_cond_t\n");
exit(-1);
}
condition = (pthread_cond_t*) mmap(NULL, sizeof(pthread_cond_t),PROT_READ | PROT_WRITE, MAP_SHARED, des_cond, 0);
if (condition == MAP_FAILED ) {
perror("Error on mmap on condition\n");
exit(1);
}
/* set mutex shared between processes */
pthread_mutexattr_t mutexAttr;
pthread_mutexattr_init(&mutexAttr);
pthread_mutexattr_setpshared(&mutexAttr, PTHREAD_PROCESS_SHARED);
pthread_mutex_init(mutex, &mutexAttr);
/* set condition shared between processes */
pthread_condattr_t condAttr;
pthread_condattr_init(&condAttr);
pthread_condattr_setpshared(&condAttr, PTHREAD_PROCESS_SHARED);
pthread_cond_init(condition, &condAttr);
create_shared_memory();
shared_stuff->conditionSatisfied=0;
while(1)
{
printf("Receiver waits on for signal from hello1.c \n");
pthread_mutex_lock(mutex);
while(!shared_stuff->conditionSatisfied)
pthread_cond_wait(condition, mutex);
pthread_mutex_unlock(mutex);
printf("Signal received, wake up!!!!!!!!\n");
//reset
pthread_mutex_lock(mutex);
shared_stuff->conditionSatisfied=0;
pthread_mutex_unlock(mutex);
}
}
【问题讨论】:
-
在我看来,您对共享内存的使用是错误的:您正在使用 O_CREATE 和 ftruncate 在两个进程中调用 shm_open。只有一个进程应该创建共享内存段并调整它的大小。另一个应该附在它上面。
-
在处理互斥体之前,您是否尝试在两个进程之间共享某些内容?
-
@terencehill ,AFIK,其他进程将首先检查共享内存是否已经存在。如果它已经存在,则不会创建新的,只会附加到现有的。
-
@terencehill,问题是conditionSatisfied的更新值在消费者进程中不可见。
-
事情发生的顺序是什么?您在生产者和消费者中都进行了截断。
标签: c multithreading gcc