【问题标题】:Pthreads signals getting lost when used as IPC inLinux在 Linux 中用作 IPC 时 Pthreads 信号丢失
【发布时间】:2015-05-28 03:41:57
【问题描述】:

我们正在为台式计算机开发一个简单的 Linux 应用程序。方案很简单,如下:

  1. 处理外部世界接口(定期提供一些数据)并产生并跟踪子进程的主进程。
  2. 处理这些数据并不时向主进程报告的各种子进程。

从外部世界接口传入的数据以大约 240 KB 的块的形式出现,1 个块以大约每毫秒一次的速率传入。所有子进程都使用和处理相同的数据,即完整的块在到达时被发送到每个子进程。

子进程的数量不固定,可以从4个到20个不等。进程间通信采用的方案如下:

  1. 能够保存多个数据块的共享内存由所有进程使用公共密钥并使用 shmget() 和 shmat() API 创建。主进程写入该共享内存,所有子进程从该内存读取。
  2. 为了通知子进程新数据块已经到达,我使用了 pthread_cond_broadcast() API。用于此目的的条件变量和相应的互斥锁驻留在一个小的单独共享内存中,并在主进程中初始化为默认属性。

因此,每当新数据块到达时(大约每 1 毫秒一次),主进程调用 pthread_cond_broadcast(),等待 pthread_cond_wait() 的子进程从共享内存中读取这些数据并进行处理。我面临的问题是:

根据处理器负载,有时 pthread 信号会丢失,即要么只传递给一些等待的子进程,要么不传递给等待的子进程。这会严重影响数据处理,因为数据连续性丢失(并且子进程甚至都不知道)。子进程的处理时间平均为 300 微秒,并且此应用程序在多核处理器上运行。

为了解决这个问题,我什至创建了一个虚拟应用程序,其中包含 1 个主进程和几个虚拟子进程,它们除了等待 pthread_cond_wait() 之外什么都不做。从主进程中,我每 1 毫秒调用一次 pthread_cond_broadcast 并增加并打印一个计数,类似地,每次在子进程中接收到一个 pthread 信号时,都会增加并打印另一个计数。当我运行这个测试程序时,我发现一段时间后接收者的计数开始落后于发送者的计数,并且他们之间的计数差距继续增加。我的理解是否正确,这是由于某些 pthread 信号未传递造成的?还有其他快速且安全的 IPC 机制吗?

我什至尝试使用互联网域套接字在广播中使用 UDP 数据报进行相同的操作(仅用于同步目的,同时仍从共享内存中读取数据)。但在这里我也注意到随着子进程数量的增加,同步信号正在丢失。请给出你的想法和想法。

考虑如下测试程序:

#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <pthread.h>

#define     SHM_KEY             3579
#define     NumOfChildProc      20

int Packets_Tx = 0, Packets_Rx = 0;

void ChildProc(void)
{
    /* Create the shared memory with same key as SHM_KEY
     * Declare the condition and mutex and assign them the shared memory   
       address */

     while(1)
     {
         pthread_mutex_lock(PTMutex);
         pthread_cond_wait(PTCond, PTMutex);
         pthread_mutex_unlock(PTMutex);

         printf("From CP [%d]: Packets Received = %d\n",getpid(), Packets_Rx++);
    }
}

 int main(void)
 {
    int     shmid, i;
    pid_t   l_pid;
    char*   SigBlock;

    pthread_condattr_t  condattr;
    pthread_mutexattr_t mutexattr;
    pthread_cond_t*     PTCond;
    pthread_mutex_t*    PTMutex;

    shmid = shmget(SHM_KEY, (sizeof(pthread_cond_t) + sizeof(pthread_mutex_t)), IPC_CREAT | 0666);
    if(shmid < 0)
    {
        perror("shmget");
    }

    SigBlock = (char *)shmat(shmid, NULL, 0);
    if(SigBlock == (char *) -1)
    {
        perror("shmat");
    }

    PTCond      = (pthread_cond_t*) SigBlock;
    PTMutex     = (pthread_mutex_t*)(SigBlock + sizeof(pthread_cond_t));

    pthread_condattr_init(&condattr);
    pthread_condattr_setpshared(&condattr, PTHREAD_PROCESS_SHARED);
    pthread_cond_init(PTCond, &condattr);
    pthread_condattr_destroy(&condattr);

    pthread_mutexattr_init(&mutexattr);
    pthread_mutexattr_setpshared(&mutexattr, PTHREAD_PROCESS_SHARED);
    pthread_mutex_init(PTMutex, &mutexattr);
    pthread_mutexattr_destroy(&mutexattr);

    for(i=0; i<NumOfChildProc; i++)
    {
         l_pid = fork();
         if(l_pid == 0)
              ChildProc();
    }
    sleep(1);

    while(1)
    {
        /* Send pthread broadcast and increment the packets count */
        printf("From Main Process : Packets Sent = %d\n", Packets_Tx++);
        pthread_cond_broadcast(PTCond);
        usleep(1000);
    }
}

【问题讨论】:

  • 欢迎来到 stackoverflow.com。请花一些时间阅读the help pages,尤其是名为"What topics can I ask about here?""What types of questions should I avoid asking?" 的部分。也请read about how to ask good questions。您可能还想了解如何创建Minimal, Complete, and Verifiable Example
  • 您还没有真正提出可以自信回答的问题。然而,我猜想问题在于,当您调用pthread_cond_broadcast() 时,有时没有进程等待您的条件变量。在这种情况下,调用无效。当系统负载更重并且每个进程在信号之间执行更多工作时,您可以预期这种情况会更频繁地发生,但您需要做好准备,即使在微不足道的情况下也会偶尔发生。
  • 假设确实是当新数据块到达时有时没有等待进程的问题,问题实际上是您的整体设计无法适应这种可能性,而不是pthread_cond_broadcast()不可靠。我无法建议替代方案,因为我不了解您的处理模型。似乎您希望招募所有可用的进程来处理每个新数据块,但是为什么这无关紧要(假设它至少为 1)?这是阻止您拥有经典的生产者/消费者模型的主要因素。
  • 您确定可以跨进程边界使用pthread_cond_broadcast() 吗?我的理解是它用于线程之间的通信。
  • @juhist:条件变量可以跨进程边界使用,只要它们是使用设置为PTHREAD_PROCESS_SHARED 的进程共享属性创建的。见pthread_condattr_setpshared()

标签: c linux pthreads ipc


【解决方案1】:

pthread_cond_broadcast() 信号不会“丢失”。在发送广播时正在等待 pthread_cond_wait() 调用的每个线程都将被唤醒 - 您的问题几乎可以肯定是每个线程在此时等待 pthread_cond_wait() 调用其中pthead_cond_broadcast() 被调用 - 发送广播时,某些线程可能仍在处理最后一批数据,在这种情况下,它们将“错过”广播。

pthread 条件变量应始终与共享状态的合适条件(或谓词)配对,并且线程仅应在检查该谓词的状态后调用pthread_cond_wait()

例如,在您的情况下,您可能有一个共享变量,它是最新到达的块的块号。在主线程中,它会在广播条件变量之前增加 this(同时持有互斥锁):

pthread_mutex_lock(&lock);
latest_block++;
pthread_cond_broadcast(&cond);
pthread_mutex_unlock(&lock);

在工作线程中,每个线程都会在局部变量中跟踪它处理的最后一个块,并在调用pthread_cond_wait()之前检查是否有另一个块到达:

pthread_mutex_lock(&lock);
while (latest_block <= my_last_block)
    pthread_cond_wait(&cond, &lock);
pthread_mutex_unlock(&lock);

这将导致工作线程等待,直到主线程增加 latest_block 大于 my_last_block(此工作线程处理的最后一个块)。


您的示例测试代码有同样的问题 - 当子线程锁定或解锁互斥锁时,或在 printf() 调用内部时,主线程迟早会调用 pthread_cond_broadcast()

您的示例代码的一个版本,更新为使用我提到的修复,没有显示这个问题:

#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <pthread.h>

#define     SHM_KEY             9753
#define     NumOfChildProc      20

int Packets_Tx = 0, Packets_Rx = 0;

struct {
    pthread_cond_t     PTCond;
    pthread_mutex_t    PTMutex;
    int last_packet;
} *shared_data;

void ChildProc(void)
{
    int my_last_packet = 0;
    /* Create the shared memory with same key as SHM_KEY
     * Declare the condition and mutex and assign them the shared memory
       address */

     while(1)
     {
         pthread_mutex_lock(&shared_data->PTMutex);
         while (shared_data->last_packet <= my_last_packet)
             pthread_cond_wait(&shared_data->PTCond, &shared_data->PTMutex);
         pthread_mutex_unlock(&shared_data->PTMutex);

         printf("From CP [%d]: Packets Received = %d\n",getpid(), Packets_Rx++);
         my_last_packet++;
    }
}

int main(void)
{
    int     shmid, i;
    pid_t   l_pid;

    pthread_condattr_t  condattr;
    pthread_mutexattr_t mutexattr;

    shmid = shmget(SHM_KEY, sizeof *shared_data, IPC_CREAT | 0666);
    if(shmid < 0)
    {
        perror("shmget");
    }

    shared_data = shmat(shmid, NULL, 0);
    if(shared_data == (void *) -1)
    {
        perror("shmat");
    }

    pthread_condattr_init(&condattr);
    pthread_condattr_setpshared(&condattr, PTHREAD_PROCESS_SHARED);
    pthread_cond_init(&shared_data->PTCond, &condattr);
    pthread_condattr_destroy(&condattr);

    pthread_mutexattr_init(&mutexattr);
    pthread_mutexattr_setpshared(&mutexattr, PTHREAD_PROCESS_SHARED);
    pthread_mutex_init(&shared_data->PTMutex, &mutexattr);
    pthread_mutexattr_destroy(&mutexattr);

    shared_data->last_packet = 0;

    for(i=0; i<NumOfChildProc; i++)
    {
         l_pid = fork();
         if(l_pid == 0)
              ChildProc();
    }
    sleep(1);

    while(1)
    {
        /* Send pthread broadcast and increment the packets count */
        printf("From Main Process : Packets Sent = %d\n", Packets_Tx++);
        pthread_mutex_lock(&shared_data->PTMutex);
        shared_data->last_packet++;
        pthread_cond_broadcast(&shared_data->PTCond);
        pthread_mutex_unlock(&shared_data->PTMutex);
        usleep(30);
    }
}

【讨论】:

  • Thankscaf 和 @John Bollinger 为您的 cmets。我确实考虑过发送 pthread_cond_broadcast 时子进程正忙于做其他事情的可能性。为了检查这一点,我创建了一个测试程序(在上面的原始问题中添加)。生产者每 1 毫秒发送一次 pthread 广播,而消费者除了等待 pthread_cond 之外什么都不做。当我执行此操作时,我观察到同样的事情,即消费者的数据包计数在一段时间后开始落后于生产者的数据包计数。
  • @Adhi:您的测试程序有完全相同的问题 - 主程序迟早会在子进程锁定或解锁互斥锁或在@中执行时调用pthread_cond_broadcast() 987654335@。这根本不是使用条件变量的正确方法。
  • 我已经用你的示例代码版本更新了我的问题,并应用了我建议的修复。
  • 感谢 caf,这真的很有帮助。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2018-08-18
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-02-24
  • 1970-01-01
相关资源
最近更新 更多