【问题标题】:Multiple producer single consumer with Circular Buffer具有循环缓冲区的多生产者单消费者
【发布时间】:2012-10-25 13:43:06
【问题描述】:

需要帮助才能完成以下工作。

我有多个生产者线程(每个线程都写 100 字节的数据)到 ringbuffer。 而一个单一的阅读器(消费者)线程,一次读取 100 个字节并写入标准输出。(最后我想根据数据写入文件)

使用这个实现,我有时会从环形缓冲区读取数据错误。见下文 由于环形缓冲区很小,它会变满并且部分数据丢失。这不是我目前的问题。

** 问题:

  1. 在打印从 ringbuffer 读取的数据时,一些数据得到 互换!!我找不到错误。
  2. 逻辑/方法是否正确? (或)有没有 更好的方法来做到这一点

ringbuffer.h

#define RING_BUFFER_SIZE  500
struct ringbuffer
{
    char *buffer;
    int wr_pointer;
    int rd_pointer;
    int size;
    int fill_count;
};

ringbuffer.c

#include <stdio.h>
#include <stdlib.h> 
#include <string.h>
#include "ringbuffer.h"

int init_ringbuffer(char *rbuffer, struct ringbuffer *rb, size_t size)
{
    rb->buffer = rbuffer;
    rb->size = size;
        rb->rd_pointer = 0;
        rb->wr_pointer = 0; 
        rb->fill_count = 0;
    return 0;
}

int rb_get_free_space (struct ringbuffer *rb)
{ 
    return (rb->size -  rb->fill_count);
}

int rb_write (struct ringbuffer *rb, unsigned char * buf, int len)
{
    int availableSpace;
    int i;

    availableSpace = rb_get_free_space(rb);
    printf("In Write AVAIL SPC=%d\n",availableSpace);
    /* Check if Ring Buffer is FULL */
    if(len > availableSpace)
    {
       printf("NO SPACE TO WRITE - RETURN\n");
       return -1;
    }

    i = rb->wr_pointer;
    if(i == rb->size) //At the end of Buffer 
    {
       i = 0;
    }    
    else if (i + len > rb->size)
    {
        memcpy(rb->buffer + i, buf, rb->size - i);
        buf += rb->size - i;
        len = len - (rb->size - i);
        rb->fill_count += len;
        i = 0;
    }
    memcpy(rb->buffer + i, buf, len);
    rb->wr_pointer = i + len;
    rb->fill_count += len;

    printf("w...rb->write=%tx\n", rb->wr_pointer );
    printf("w...rb->read=%tx\n", rb->rd_pointer );
    printf("w...rb->fill_count=%d\n", rb->fill_count );
    return 0;
}

int rb_read (struct ringbuffer *rb, unsigned char * buf, int max)
{
    int i;

    printf("In Read,Current DATA size in RB=%d\n",rb->fill_count);
    /* Check if Ring Buffer is EMPTY */
    if(max > rb->fill_count) 
    {
      printf("In Read, RB EMPTY - RETURN\n");
      return  -1; 
    }  

    i = rb->rd_pointer;
    if (i == rb->size)
    {
       i = 0;
    }
    else if(i + max > rb->size)
    {
        memcpy(buf, rb->buffer + i, rb->size - i);
        buf += rb->size - i;
        max = max - (rb->size - i);
        rb->fill_count -= max;
        i = 0;
    }
    memcpy(buf, rb->buffer + i, max);
    rb->rd_pointer = i + max;
    rb->fill_count -= max;

    printf("r...rb->write=%tx\n", rb->wr_pointer );
    printf("r...rb->read=%tx\n", rb->rd_pointer );
    printf("DATA READ ---> %s\n",(char *)buf);
    printf("r...rb->fill_count=%d\n", rb->fill_count );
    return 0;
}

【问题讨论】:

    标签: c multithreading synchronization pthreads mutex


    【解决方案1】:

    在生产者处,您还需要等待 has empty space 条件的条件变量。这两个条件变量都应该无条件地发出信号,即当消费者从环形缓冲区中删除一个元素时,它应该向生产者发出信号;当生产者将某些东西放入缓冲区时,它应该向消费者发出信号。 此外,我会将这个等待/信令逻辑移动到 rb_read 和 rb_write 实现中,因此您的环形缓冲区对于您的程序的其余部分来说是一个“完整的使用解决方案”。

    【讨论】:

    • 我很好,在调用 rb_write() 并且缓冲区已满时。我想获得正确的同步,以便在阅读时获得正确的值。
    【解决方案2】:

    关于你的问题—— 1. 我也找不到那个错误——事实上,我试过你的代码并没有看到那个行为。 2. 你问这是否逻辑/方法正确——好吧,就目前而言,这确实实现了一种环形缓冲区。您的测试用例恰好是大小的整数倍,并且记录大小是恒定的,所以这不是最好的测试。

    在尝试您的代码时,我发现有很多线程饥饿——第一个运行的生产者线程(最后一个创建的)非常困难,在第一次 5 次之后尝试和失败将东西填充到缓冲区中,不给消费者线程运行(甚至启动)的机会。然后,当消费者线程启动时,它会在释放 cpu 之前保持相当长的一段时间,并且下一个生产者线程最终会启动。这就是它在我的机器上的工作方式——我敢肯定,它在不同的机器上会有所不同。

    太糟糕了,您当前的代码没有办法结束——创建 10 或 100 兆的文件......很难涉足。

    【讨论】:

    • 感谢您的尝试。这种行为偶尔会发生一次。(或)您可能不得不慢慢浏览日志,这就是我发现它的方式。我与饥饿没有太大关系,因为它取决于操作系统来安排。我忘记添加程序将无限运行的注释。
    • 您可以通过在每个循环中添加 uSleep(0) 来改进调度(在我看来)。这将使操作系统有机会进行调度。
    【解决方案3】:

    (作者可能会晚一点,但如果其他人搜索“多生产者单消费者”)

    我认为该实现中的基本问题是 rb_write 修改了全局状态(rb->fill_count 和其他 rb->XX),而无需在多个写入器之间进行任何同步。

    有关其他想法,请查看:http://www.linuxjournal.com/content/lock-free-multi-producer-multi-consumer-queue-ring-buffer

    【讨论】:

      猜你喜欢
      • 2019-06-13
      • 2017-04-07
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-06-29
      • 2011-03-21
      • 1970-01-01
      相关资源
      最近更新 更多