【问题标题】:Unable to send signal using pthread_cond_signal to another process in C无法使用 pthread_cond_signal 将信号发送到 C 中的另一个进程
【发布时间】:2016-04-22 23:17:28
【问题描述】:

我正在尝试使用两个进程方案来实现生产者-消费者问题。

Process1 - 生产者,Process2 - 消费者。消费者进程正在等待条件变量 (pthread_cond_wait(cond)),生产者将通过 pthread_cond_signal(cond) 向消费者发送信号。

我浏览了这些链接shared mutex and condition variable across processpthread_mutexattr_setpshared,到处都说可以跨多个进程使用互斥锁和条件变量,

pthread_mutexattr_setpshared(&mutexAttr, PTHREAD_PROCESS_SHARED);
pthread_condattr_setpshared(&condAttr, PTHREAD_PROCESS_SHARED);

在这个链接shared mutexes,按照void的建议,据说检查我的系统是否支持?我检查了一下,我得到了 200809 作为sysconf(_SC_THREAD_PROCESS_SHARED) 的返回值,这表示我的系统支持PTHREAD_PROCESS_SHARED

我正在尝试将pthread_cond_signal 从生产者 (process-1) 发送到消费者 (Process-2)。 生产者和消费者都使用相同的已初始化互斥体/条件变量。

但是,消费者没有收到信号。似乎信号未发送或丢失。

我哪里出错了?我正在使用 Ubuntu,gcc-4.6.3。

这是我的代码。

生产者.c:

    #include <pthread.h>
    #include <stdlib.h>
    #include <stdio.h>
    #include <unistd.h>
    #include <string.h>
    #include <sys/types.h>
    #include <sys/ipc.h>
    #include <sys/shm.h>
    #include <sys/wait.h>
    #include <pthread.h>
    #include <sched.h>
    #include <syscall.h>
    #include <sys/stat.h>
    #include <errno.h>
    #include <sys/mman.h>
    #include <fcntl.h>
    #include <stdbool.h>

    pthread_cond_t* condition;
    pthread_mutex_t* mutex;

    #define OKTOWRITE "/oktowrite"
    #define MESSAGE "/message"
    #define MUTEX "/lock"


    struct shared_use_st 
    {
    bool conditionSatisfied;
    };

    struct shared_use_st *shared_stuff;

    void create_shared_memory()
    {
        int shmid;
        void *shared_memory=(void *)0;
        shmid =shmget( (key_t)1234, 4096, 0666 | IPC_CREAT );

        if (shmid == -1)
        {
            fprintf(stderr,"shmget failed\n");
            exit(EXIT_FAILURE);
        }

        shared_memory =shmat(shmid, (void *)0,0);

        if(shared_memory == (void *)-1)
        {
            fprintf(stderr,"shmat failed\n");
            exit(EXIT_FAILURE); 
        }

        shared_stuff = (struct shared_use_st *)shared_memory;
    }


    int main()
    {
        int des_cond, des_msg, des_mutex;
        int mode = S_IRWXU | S_IRWXG;

        des_mutex = shm_open(MUTEX, O_CREAT | O_RDWR | O_TRUNC, mode);

        if (des_mutex < 0) {
            perror("failure on shm_open on des_mutex");
            exit(1);
        }

        if (ftruncate(des_mutex, sizeof(pthread_mutex_t)) == -1) {
            perror("Error on ftruncate to sizeof pthread_cond_t\n");
            exit(-1);
        }

        mutex = (pthread_mutex_t*) mmap(NULL, sizeof(pthread_mutex_t),PROT_READ | PROT_WRITE, MAP_SHARED, des_mutex, 0);

        if (mutex == MAP_FAILED ) {
            perror("Error on mmap on mutex\n");
            exit(1);
        }

        des_cond = shm_open(OKTOWRITE, O_CREAT | O_RDWR | O_TRUNC, mode);

        if (des_cond < 0) {
            perror("failure on shm_open on des_cond");
            exit(1);
        }

        if (ftruncate(des_cond, sizeof(pthread_cond_t)) == -1) {
            perror("Error on ftruncate to sizeof pthread_cond_t\n");
            exit(-1);
        }

        condition = (pthread_cond_t*) mmap(NULL, sizeof(pthread_cond_t),PROT_READ | PROT_WRITE, MAP_SHARED, des_cond, 0);

        if (condition == MAP_FAILED ) {
            perror("Error on mmap on condition\n");
            exit(1);
        }

        /* set mutex shared between processes */
        pthread_mutexattr_t mutexAttr;
        pthread_mutexattr_init(&mutexAttr);
        pthread_mutexattr_setpshared(&mutexAttr, PTHREAD_PROCESS_SHARED);
        pthread_mutex_init(mutex, &mutexAttr);

        /* set condition shared between processes */
        pthread_condattr_t condAttr;
        pthread_condattr_init(&condAttr);
        pthread_condattr_setpshared(&condAttr, PTHREAD_PROCESS_SHARED);
        pthread_cond_init(condition, &condAttr);

        create_shared_memory();
        shared_stuff->conditionSatisfied=0;

        int count=0;
        while(count++<10)
        {
            pthread_mutex_lock(mutex);
            shared_stuff->conditionSatisfied=1;
            pthread_mutex_unlock(mutex);

            pthread_cond_signal(condition);
            printf("signal sent to consumer, %d\n",count);

            sleep(3);
        }

        pthread_condattr_destroy(&condAttr);
        pthread_mutexattr_destroy(&mutexAttr);
        pthread_mutex_destroy(mutex);
        pthread_cond_destroy(condition);

        shm_unlink(OKTOWRITE);
        shm_unlink(MESSAGE);
        shm_unlink(MUTEX);

        return 0;
    }

消费者.c:

#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/wait.h>
#include <pthread.h>
#include <sched.h>
#include <syscall.h>
#include <sys/stat.h>
#include <errno.h>
#include <sys/mman.h>
#include <fcntl.h>
#include <stdbool.h>

pthread_cond_t* condition;
pthread_mutex_t* mutex;

#define OKTOWRITE "/oktowrite"
#define MESSAGE "/message"
#define MUTEX "/lock"

struct shared_use_st 
{
bool conditionSatisfied;
};

struct shared_use_st *shared_stuff;

void create_shared_memory()
{
    int shmid;
    void *shared_memory=(void *)0;
    shmid =shmget( (key_t)1234, 4096, 0666 | IPC_CREAT );

    if (shmid == -1)
    {
        fprintf(stderr,"shmget failed\n");
        exit(EXIT_FAILURE);
    }

    shared_memory =shmat(shmid, (void *)0,0);

    if(shared_memory == (void *)-1)
    {
        fprintf(stderr,"shmat failed\n");
        exit(EXIT_FAILURE); 
    }

    shared_stuff = (struct shared_use_st *)shared_memory;
}

int main()
{
    int des_cond, des_msg, des_mutex;
    int mode = S_IRWXU | S_IRWXG;

    des_mutex = shm_open(MUTEX, O_CREAT | O_RDWR | O_TRUNC, mode);

    if (des_mutex < 0) {
        perror("failure on shm_open on des_mutex");
        exit(1);
    }

    if (ftruncate(des_mutex, sizeof(pthread_mutex_t)) == -1) {
        perror("Error on ftruncate to sizeof pthread_cond_t\n");
        exit(-1);
    }

    mutex = (pthread_mutex_t*) mmap(NULL, sizeof(pthread_mutex_t),PROT_READ | PROT_WRITE, MAP_SHARED, des_mutex, 0);

    if (mutex == MAP_FAILED ) {
        perror("Error on mmap on mutex\n");
        exit(1);
    }

    des_cond = shm_open(OKTOWRITE, O_CREAT | O_RDWR | O_TRUNC, mode);

    if (des_cond < 0) {
        perror("failure on shm_open on des_cond");
        exit(1);
    }

    if (ftruncate(des_cond, sizeof(pthread_cond_t)) == -1) {
        perror("Error on ftruncate to sizeof pthread_cond_t\n");
        exit(-1);
    }

    condition = (pthread_cond_t*) mmap(NULL, sizeof(pthread_cond_t),PROT_READ | PROT_WRITE, MAP_SHARED, des_cond, 0);

    if (condition == MAP_FAILED ) {
        perror("Error on mmap on condition\n");
        exit(1);
    }

    /* set mutex shared between processes */
    pthread_mutexattr_t mutexAttr;
    pthread_mutexattr_init(&mutexAttr);
    pthread_mutexattr_setpshared(&mutexAttr, PTHREAD_PROCESS_SHARED);
    pthread_mutex_init(mutex, &mutexAttr);

    /* set condition shared between processes */
    pthread_condattr_t condAttr;
    pthread_condattr_init(&condAttr);
    pthread_condattr_setpshared(&condAttr, PTHREAD_PROCESS_SHARED);
    pthread_cond_init(condition, &condAttr);

    create_shared_memory();
    shared_stuff->conditionSatisfied=0;

    while(1)
    {
        printf("Receiver waits on for signal from hello1.c \n");

        pthread_mutex_lock(mutex);
        while(!shared_stuff->conditionSatisfied)
            pthread_cond_wait(condition, mutex);
        pthread_mutex_unlock(mutex);

        printf("Signal received, wake up!!!!!!!!\n");

        //reset
        pthread_mutex_lock(mutex);
        shared_stuff->conditionSatisfied=0;
        pthread_mutex_unlock(mutex);
    }

}

【问题讨论】:

  • 在我看来,您对共享内存的使用是错误的:您正在使用 O_CREATE 和 ftruncate 在两个进程中调用 shm_open。只有一个进程应该创建共享内存段并调整它的大小。另一个应该附在它上面。
  • 在处理互斥体之前,您是否尝试在两个进程之间共享某些内容?
  • @terencehill ,AFIK,其他进程将首先检查共享内存是否已经存在。如果它已经存在,则不会创建新的,只会附加到现有的。
  • @terencehill,问题是conditionSatisfied的更新值在消费者进程中不可见。
  • 事情发生的顺序是什么?您在生产者和消费者中都进行了截断。

标签: c multithreading gcc


【解决方案1】:

进程间共享的互斥锁只能由其中一个初始化

错误是您正在初始化两个进程的 mutexcondition。但是,由于它们是共享的,您应该只在 consumer 上初始化它们。

我还建议不要将旧的 System V shmget/shmat 函数与新的 POSIX shm_open 混合使用。

我定义了一个新的共享整数shint,它指向一个用shm_open初始化并通过mmap附加的共享内存。 shint 用作在 pthread condition 上等待的标志。

在我看来,在消费者之后开始的生产者不需要截断内存段,只需O_RDWR就可以打开共享内存。生产者首先获取锁并将 shint 设置为 1。

另一方面,首先启动的消费者必须创建共享内存段并使用ftruncate调整它们的大小。它还将共享标志 shint 设置为 0 并等待它。此外,在调用 shm_open 之前,最好取消链接(通过 shm_unlik)所有使用的共享内存段,以清除先前调用中的最终错误(例如,如果代码在取消链接之前崩溃)。

我在开头移动了互斥体和条件的属性初始化,因为它看起来更清晰和正确。


Producer.c

int main()
{
    int des_cond, des_msg, des_mutex;
    int mode = S_IRWXU | S_IRWXG;

    /* set mutex shared between processes */
    pthread_mutexattr_t mutexAttr;
    pthread_mutexattr_init(&mutexAttr);
    pthread_mutexattr_setpshared(&mutexAttr, PTHREAD_PROCESS_SHARED);

    /* set condition shared between processes */
    pthread_condattr_t condAttr;
    pthread_condattr_init(&condAttr);
    pthread_condattr_setpshared(&condAttr, PTHREAD_PROCESS_SHARED);

    des_mutex = shm_open(MUTEX, O_RDWR, mode);

    if (des_mutex < 0) {
        perror("failure on shm_open on des_mutex");
        exit(1);
    }

    mutex = (pthread_mutex_t*) mmap(NULL, sizeof(pthread_mutex_t),PROT_READ | PROT_WRITE, MAP_SHARED, des_mutex, 0);

    if (mutex == MAP_FAILED ) {
        perror("Error on mmap on mutex\n");
        exit(1);
    }

    des_cond = shm_open(OKTOWRITE, O_RDWR, mode);

    if (des_cond < 0) {
        perror("failure on shm_open on des_cond");
        exit(1);
    }

    condition = (pthread_cond_t*) mmap(NULL, sizeof(pthread_cond_t), PROT_READ | PROT_WRITE, MAP_SHARED, des_cond, 0);

    if (condition == MAP_FAILED ) {
        perror("Error on mmap on condition\n");
        exit(1);
    }

    int fd = shm_open(MESSAGE, O_RDWR, 0644);
    if (fd < 0) {
        perror("failure on shm_open on fd");
        exit(1);
    }

    if(ftruncate(fd, sizeof(int)) == -1) {
        perror("Error on ftruncate to sizeof ftruncate fd\n");
        exit(-1);
    }

    int *shint;
    shint = (int *) mmap(NULL, sizeof(int), PROT_WRITE|PROT_READ, MAP_SHARED, fd, 0);

    if (shint == MAP_FAILED ) {
        perror("Error on mmap on shint\n");
        exit(1);
    }


    // set ot 0
    *shint = 0;


    int count=0;
    while(count++<10)
    {
        pthread_mutex_lock(mutex);
        *shint = 1;
        pthread_mutex_unlock(mutex);

        pthread_cond_signal(condition);
        printf("signal sent to consumer, %d\n",count);

        sleep(3);
    }

    pthread_condattr_destroy(&condAttr);
    pthread_mutexattr_destroy(&mutexAttr);
    pthread_mutex_destroy(mutex);
    pthread_cond_destroy(condition);

    shm_unlink(OKTOWRITE);
    shm_unlink(MESSAGE);
    shm_unlink(MUTEX);

    return 0;
}

Consumer.c

int main()
{
    int des_cond, des_msg, des_mutex;
    int mode = S_IRWXU | S_IRWXG;

    // Unlink first to clean  
    shm_unlink(MUTEX);
    shm_unlink(OKTOWRITE);
    shm_unlink(MESSAGE);

    pthread_mutexattr_t mutexAttr;
    pthread_mutexattr_init(&mutexAttr);
    pthread_mutexattr_setpshared(&mutexAttr, PTHREAD_PROCESS_SHARED);

    pthread_condattr_t condAttr;
    pthread_condattr_init(&condAttr);
    pthread_condattr_setpshared(&condAttr, PTHREAD_PROCESS_SHARED);


    des_mutex = shm_open(MUTEX, O_CREAT | O_RDWR, mode);

    if (des_mutex < 0) {
        perror("failure on shm_open on des_mutex");
        exit(1);
    }

    if (ftruncate(des_mutex, sizeof(pthread_mutex_t)) == -1) {
        perror("Error on ftruncate to sizeof pthread_cond_t\n");
        exit(-1);
    }

    mutex = (pthread_mutex_t*) mmap(NULL, sizeof(pthread_mutex_t),PROT_READ | PROT_WRITE, MAP_SHARED, des_mutex, 0);

    if (mutex == MAP_FAILED ) {
        perror("Error on mmap on mutex\n");
        exit(1);
    }

    des_cond = shm_open(OKTOWRITE, O_CREAT | O_RDWR, mode);

    if (des_cond < 0) {
        perror("failure on shm_open on des_cond");
        exit(1);
    }

    if (ftruncate(des_cond, sizeof(pthread_cond_t)) == -1) {
        perror("Error on ftruncate to sizeof pthread_cond_t\n");
        exit(-1);
    }

    condition = (pthread_cond_t*) mmap(NULL, sizeof(pthread_cond_t),PROT_READ | PROT_WRITE, MAP_SHARED, des_cond, 0);

    if (condition == MAP_FAILED ) {
        perror("Error on mmap on condition\n");
        exit(1);
    }

    int fd = shm_open(MESSAGE, O_CREAT|O_RDWR, 0644);
    if (fd < 0) {
        perror("failure on shm_open on fd");
        exit(1);
    }

    if(ftruncate(fd, 16) == -1) {
        perror("Error on ftruncate to sizeof ftruncate fd\n");
        exit(-1);
    }
    int *shint;
    shint = (int *) mmap(NULL, sizeof(int), PROT_WRITE|PROT_READ, MAP_SHARED, fd, 0);

    if (shint == MAP_FAILED ) {
        perror("Error on mmap on shint\n");
        exit(1);
    }


    *shint = 0;

        if (pthread_mutex_init(mutex, &mutexAttr) != 0)    {printf("Error initi mutex"); exit(111);}
        if (pthread_cond_init(condition, &condAttr) != 0) {printf("Error initi cond");  exit(111);}

    while(1)
    {
        printf("Receiver waits on for signal from hello1.c \n");

        pthread_mutex_lock(mutex);
        while(shint == 0)
            pthread_cond_wait(condition, mutex);
        printf("Waiting"); sleep(1);
        pthread_mutex_unlock(mutex);

        printf("Signal received, wake up!!!!!!!!\n");

        break;
        //reset
        pthread_mutex_lock(mutex);
        shared_stuff->conditionSatisfied=0;
        pthread_mutex_unlock(mutex);
    }

    return 0;
}

【讨论】:

  • 谢谢,从生产者向消费者发送信号非常有效。我想将数据从生产者发送到消费者,这就是我使用共享内存的原因。您对发送数据有什么建议?我应该只使用 shm_open 创建一个共享内存,就像你做了 shint int memory 一样吗?
  • 这取决于您要达到的目标。我个人会坚持使用共享内存:您可以实现几乎所有类型的通信,只需使用正确的同步机制和共享内存的数据布局。
猜你喜欢
  • 2016-08-17
  • 2011-04-16
  • 2016-09-01
  • 1970-01-01
  • 1970-01-01
  • 2021-12-15
  • 1970-01-01
  • 1970-01-01
  • 2011-07-21
相关资源
最近更新 更多