【问题标题】:How to share semaphores between processes using shared memory如何使用共享内存在进程之间共享信号量
【发布时间】:2012-01-11 15:49:10
【问题描述】:

我必须将 N 个客户端进程与一台服务器同步。这些进程由我声明了 3 个信号量的主函数分叉。我决定使用 POSIX 信号量,但我不知道如何在这些进程之间共享它们。我认为共享内存应该可以正常工作,但我有一些问题:

  • 如何在我的段中分配正确的内存空间?
  • 我可以在shmgetsize_t 字段中使用sizeof(sem_t) 来准确分配我需要的空间吗?
  • 有人有类似这种情况的例子吗?

【问题讨论】:

    标签: c synchronization client-server semaphore shared


    【解决方案1】:

    轻松分享名为POSIX 的信号量

    • 为您的信号量选择一个名称

      #define SNAME "/mysem"
      
    • 在创建它们的过程中使用sem_openO_CREAT

      sem_t *sem = sem_open(SNAME, O_CREAT, 0644, 3); /* Initial value is 3. */
      
    • 在其他进程中打开信号量

      sem_t *sem = sem_open(SEM_NAME, 0); /* Open a preexisting semaphore. */
      

    如果你坚持使用共享内存,那当然是可能的。

    int fd = shm_open("shmname", O_CREAT, O_RDWR);
    ftruncate(fd, sizeof(sem_t));
    sem_t *sem = mmap(NULL, sizeof(sem_t), PROT_READ | PROT_WRITE,
        MAP_SHARED, fd, 0);
    
    sem_init(sem, 1, 1);
    

    我还没有测试过上述内容,所以它可能完全是疯狂的。

    【讨论】:

    • 使用命名信号量我不必担心内存分配!我不知道该怎么做。非常感谢
    • 是的,但在我看来,使用命名信号量似乎更优雅
    • @cnicutar,你的回答很棒。我知道你还没有测试过。您能否将其与 int fd = shm_open("shmname", O_CREAT, O_RDWR); 进行比较? ftruncate(fd, sizeof(pthread_mutex_t)); pthread_mutec_t *mutex = mmap(NULL, sizeof(pthread_mutex_t), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); pthread_mutexattr_init(属性); pthread_mutexattr_setpshared(属性,PROCESS_SHARED); pthread_mutex_init(mutex, 属性);
    • 不命名信号量意味着您不能同时打开程序的多个实例而不会相互干扰?
    • @DavidRoundy 名称可以是动态的(例如命令行参数),在这种情况下可以运行多个实例。
    【解决方案2】:

    我编写了一个示例应用程序,其中父(生产者)产生 N 个子(消费者)线程并使用全局数组在它们之间传递数据。全局数组是一个循环内存,当数据写入数组时受到保护。

    头文件:

    #define TRUE 1
    #define FALSE 0
    
    #define SUCCESS 0
    #define FAILURE -1
    
    /*
     * Function Declaration.
     */
    void Parent( void );
    int CalcBitmap(unsigned int Num);
    void InitSemaphore( void );
    void DeInitSemaphore( void );
    int ComputeComplexCalc( int op1, int op2);
    
    /*
     * Thread functions.
     */
    void *Child( void *arg );
    void *Report( void *arg1 );
    
    /*
     * Macro Definition.
     */
    #define INPUT_FILE  "./input.txt"
    #define NUM_OF_CHILDREN 10
    #define SIZE_CIRCULAR_BUF 256
    #define BUF_SIZE 128
    //Set the bits corresponding to all threads.
    #define SET_ALL_BITMAP( a ) a = uiBitmap;     
    //Clears the bit corresponding to a thread after
    //every read. 
    #define CLEAR_BIT( a, b ) a &= ~( 1 << b );   
    
    /*
     * Have a dedicated semaphore for each buffer
     * to synchronize the acess to the shared memory
     * between the producer (parent) and the consumers. 
     */
    sem_t ReadCntrMutex;
    sem_t semEmptyNode[SIZE_CIRCULAR_BUF];
    
    /*
     * Global Variables.
     */
    unsigned int uiBitmap = 0;
    //Counter to track the number of processed buffers.
    unsigned int Stats;                           
    unsigned int uiParentCnt = 0;
    char inputfile[BUF_SIZE];
    int EndOfFile = FALSE;
    
    /*
     * Data Structure Definition. ( Circular Buffer )
     */
    struct Message
    {
        int id;
        unsigned int BufNo1;
        unsigned int BufNo2;
        /* Counter to check if all threads read the buffer. */
        unsigned int uiBufReadCntr;
    };
    struct Message ShdBuf[SIZE_CIRCULAR_BUF];
    

    源代码:

        /*
    # 
    # ipc_communicator is a IPC binary where a parent
    # create ten child threads, read the input parameters
    # from a file, sends that parameters to all the children.
    # Each child computes the operations on those input parameters
    # and writes them on to their own file.
    #
    #  Author: Ashok Vairavan              Date: 8/8/2014
    */
    
    /*
     * Generic Header Files.
     */
    #include "stdio.h"
    #include "unistd.h"
    #include "string.h"
    #include "pthread.h"
    #include "errno.h"
    #include "malloc.h"
    #include "fcntl.h"
    #include "getopt.h"
    #include "stdlib.h"
    #include "math.h"
    #include "semaphore.h"
    
    /*
     * Private Header Files.
     */
    #include "ipc*.h"
    
    /*
     * Function to calculate the bitmap based on the 
     * number of threads. This bitmap is used to 
     * track which thread read the buffer and the
     * pending read threads.
     */
    int CalcBitmap(unsigned int Num)
    {
       unsigned int uiIndex;
    
       for( uiIndex = 0; uiIndex < Num; uiIndex++ )
       {
          uiBitmap |= 1 << uiIndex;
       }
       return uiBitmap;
    }
    /*
     * Function that performs complex computation on
     * numbers.
     */
    int ComputeComplexCalc( int op1, int op2)
    {
        return sqrt(op1) + log(op2);
    }
    
    /*
     * Function to initialise the semaphores. 
     * semEmptyNode indicates if the buffer is available
     * for write. semFilledNode indicates that the buffer
     * is available for read.
     */
    void InitSemaphore( void )
    {
       unsigned int uiIndex=0;
    
       CalcBitmap(NUM_OF_CHILDREN);
    
       sem_init( &ReadCntrMutex, 0, 1);
       while( uiIndex<SIZE_CIRCULAR_BUF )
       {
        sem_init( &semEmptyNode[uiIndex], 0, 1 );
            uiIndex++;
       }
       return;
    }
    
    /* 
     * Function to de-initilaise semaphore.
     */
    void DeInitSemaphore( void )
    {
       unsigned int uiIndex=0;
    
       sem_destroy( &ReadCntrMutex);
       while( uiIndex<SIZE_CIRCULAR_BUF )
       {
        sem_destroy( &semEmptyNode[uiIndex] );
            uiIndex++;
       }
       return;
    }
    
    /* Returns TRUE if the parent had reached end of file */
    int isEOF( void )
    {
       return __sync_fetch_and_add(&EndOfFile, 0);
    }
    
    /*
     * Thread functions that prints the number of buffers
     * processed per second.
     */ 
    void *Report( void *arg1 )
    {
       int CumStats = 0;
    
       while( TRUE )
       {
          sleep( 1 );
          CumStats += __sync_fetch_and_sub(&Stats, 0);
          printf("Processed Per Second: %d Cumulative Stats: %d\n", 
                   __sync_fetch_and_sub(&Stats, Stats), CumStats);
       }
       return NULL;
    }
    
    /*
     * Function that reads the data from the input file
     * fills the shared buffer and signals the children to
     * read the data.
     */
    void Parent( void )
    {
       unsigned int op1, op2;
       unsigned int uiCnt = 0;
    
       /* Read the input parameters and process it in
        * while loop till the end of file.
        */
    
       FILE *fp = fopen( inputfile, "r" );
       while( fscanf( fp, "%d %d", &op1, &op2 ) == 2 )
       {
         /* Wait for the buffer to become empty */
         sem_wait(&semEmptyNode[uiCnt]);
    
         /* Access the shared buffer */
         ShdBuf[uiCnt].BufNo1 = op1;
         ShdBuf[uiCnt].BufNo2 = op2;
    
         /* Bitmap to track which thread read the buffer */
         sem_wait( &ReadCntrMutex );
         SET_ALL_BITMAP( ShdBuf[uiCnt].uiBufReadCntr );
         sem_post( &ReadCntrMutex );
    
         __sync_fetch_and_add( &uiParentCnt, 1);
         uiCnt = uiParentCnt % SIZE_CIRCULAR_BUF;
       }
    
       /* If it is end of file, indicate it to other
        * children using global variable.
        */ 
       if( feof( fp ) )
       {
          __sync_add_and_fetch(&EndOfFile, TRUE);
          fclose( fp ); 
          return;
       }
    
       return;
    }
    
    /* 
     * Child thread function which takes threadID as an
     * argument, creates a file using threadID, fetches the
     * parameters from the shared memory, computes the calculation,
     * writes the output to their own file and will release the 
     * semaphore when all the threads read the buffer.
     */
    void *Child( void *ThreadPtr )
    {
       unsigned int uiCnt = 0, uiTotalCnt = 0;
       unsigned int op1, op2;
       char filename[BUF_SIZE];
       int ThreadID;
    
       ThreadID = *((int *) ThreadPtr);
       free( ThreadPtr );
    
       snprintf( filename, BUF_SIZE, "Child%d.txt", ThreadID );
    
       FILE *fp = fopen( filename, "w" );
       if( fp == NULL )
       {
          perror( "fopen" );
          return NULL;
       }
    
       while (TRUE)
       {
          /* Wait till the parent fills the buffer */
          if( __sync_fetch_and_add( &uiParentCnt, 0 ) > uiTotalCnt )
          {
             /* Access the shared memory */
             op1 = ShdBuf[uiCnt].BufNo1;
             op2 = ShdBuf[uiCnt].BufNo2;
    
             fprintf(fp, "%d %d = %d\n", op1, op2, 
                                   ComputeComplexCalc( op1, op2) ); 
    
             sem_wait( &ReadCntrMutex );
             ShdBuf[uiCnt].uiBufReadCntr &= ~( 1 << ThreadID );
             if( ShdBuf[uiCnt].uiBufReadCntr == 0 )
             {
                __sync_add_and_fetch(&Stats, 1);
    
                /* Release the semaphore lock if 
                   all readers read the data */
                sem_post(&semEmptyNode[uiCnt]);
             }
             sem_post( &ReadCntrMutex );
    
             uiTotalCnt++;
             uiCnt = uiTotalCnt % SIZE_CIRCULAR_BUF;
    
             /* If the parent reaches the end of file and 
                if the child thread read all the data then break out */
             if( isEOF() && ( uiTotalCnt ==  uiParentCnt ) )
             {
                //printf("Thid %d p %d c %d\n", ThreadID, uiParentCnt, uiCnt );
                break;
             }
          }
          else
          {
             /* Sleep for ten micro seconds before checking 
                the shared memory */
             usleep(10);
          }
       }
       fclose( fp );
       return NULL;
    }
    
    void usage( void )
    {
        printf(" Usage:\n");
        printf("         -f - the absolute path of the input file where the input parameters are read from.\n\t\t Default input file: ./input.txt");
        printf("         -?  - Help Menu. \n" );
        exit(1);
    }
    
    int main( int argc, char *argv[])
    {
       pthread_attr_t ThrAttr;
       pthread_t ThrChild[NUM_OF_CHILDREN], ThrReport;
       unsigned int uiThdIndex = 0;
       unsigned int *pThreadID;
       int c;
    
       strncpy( inputfile, INPUT_FILE, BUF_SIZE );
    
       while( ( c = getopt( argc, argv, "f:" )) != -1 )
       {
           switch( c )
           {
               case 'f':
                    strncpy( inputfile, optarg, BUF_SIZE );
                    break;
    
               default:
                    usage();
            }
       }
    
       if( access( inputfile, F_OK ) == -1 )
       {
          perror( "access" );
          return FAILURE;
       } 
    
       InitSemaphore();
    
       pthread_attr_init( &ThrAttr );
       while( uiThdIndex< NUM_OF_CHILDREN )
       {
          /* Allocate the memory from the heap and pass it as an argument
           * to each thread to avoid race condition and invalid reference
           * issues with the local variables.
           */
          pThreadID = (unsigned int *) malloc( sizeof( unsigned int ) );
          if( pThreadID == NULL )
          {
         perror( "malloc" );
         /* Cancel pthread operation */
         return FAILURE;
          }
          *pThreadID = uiThdIndex;
          if( pthread_create( 
                 &ThrChild[uiThdIndex], NULL, &Child, pThreadID) != 0 )
          {
        printf( "pthread %d creation failed. Error: %d\n", uiThdIndex, errno);
        perror( "pthread_create" );
        return FAILURE;
          }
          uiThdIndex++;
       }
    
       /* Report thread reports the statistics of IPC communication
        * between parent and childs threads.
        */
       if( pthread_create( &ThrReport, NULL, &Report, NULL) != 0 )
       {
        perror( "pthread_create" );
        return FAILURE;
       }
    
       Parent();
    
       uiThdIndex = 0;
       while( uiThdIndex < NUM_OF_CHILDREN )
       {
          pthread_join( ThrChild[uiThdIndex], NULL );
          uiThdIndex++;
       }
    
       /*
        * Cancel the report thread function when all the 
        * children exits.
        */
       pthread_cancel( ThrReport );
    
       DeInitSemaphore();
    
       return SUCCESS;
    }
    

    【讨论】:

      【解决方案3】:

      是的,我们可以在两个进程之间使用命名信号量。 我们可以打开一个文件。

      snprintf(sem_open_file, 128, "/sem_16");
      ret_value = sem_open((const char *)sem_open_file, 0, 0, 0);
      
      if (ret_value == SEM_FAILED) {
          /*
           * No such file or directory
           */
          if (errno == ENOENT) {
              ret_value = sem_open((const char *)sem_open_file,
                                 O_CREAT | O_EXCL, 0777, 1);
           }
      }
      

      其他进程可以使用这个。

      【讨论】:

        【解决方案4】:

        我真的怀疑共享内存方法是否可行。

        AFAIK 信号量实际上只是一个句柄,对打开它的进程有效。真正的信号量信息位于内核内存中。

        所以在其他进程中使用相同的句柄值是行不通的。

        【讨论】:

        • 感谢您的回答。您能否将其与 int fd = shm_open("shmname", O_CREAT, O_RDWR); 进行比较? ftruncate(fd, sizeof(pthread_mutex_t)); pthread_mutex_t *mutex = mmap(NULL, sizeof(pthread_mutex_t), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); pthread_mutexattr_init(属性); pthread_mutexattr_setpshared(属性,PTHREAD_PROCESS_SHARED); pthread_mutex_init(mutex, 属性);
        • 这个答案是完全错误的:信号量可以在进程之间共享。检查sem_init的第二个参数的文档。