【问题标题】:Optimizing a shared buffer in a producer/consumer multithreaded environment在生产者/消费者多线程环境中优化共享缓冲区
【发布时间】:2011-02-15 21:08:17
【问题描述】:

我有一个项目,其中我有一个将事件写入缓冲区的单个生产者线程,以及一个从缓冲区获取事件的附加单个消费者线程。我的目标是针对单个双核机器优化这个东西,以实现最大吞吐量。

目前,我正在使用一些简单的无锁环形缓冲区(无锁是可能的,因为我只有一个消费者和一个生产者线程,因此指针仅由单个线程更新)。

#define BUF_SIZE 32768

struct buf_t { volatile int writepos; volatile void * buffer[BUF_SIZE]; 
    volatile int readpos;) };

void produce (buf_t *b, void * e) {
    int next = (b->writepos+1) % BUF_SIZE;
    while (b->readpos == next); // queue is full. wait
    b->buffer[b->writepos] = e; b->writepos = next;
}

void * consume (buf_t *b) {
    while (b->readpos == b->writepos); // nothing to consume. wait
    int next = (b->readpos+1) % BUF_SIZE;
    void * res = b->buffer[b->readpos]; b->readpos = next;
    return res;
}

buf_t *alloc () {
    buf_t *b = (buf_t *)malloc(sizeof(buf_t));
    b->writepos = 0; b->readpos = 0; return b;
}

但是,这个实现还不够快,应该进一步优化。我尝试了不同的BUF_SIZE 值并获得了一些加速。另外,我将writepos 移到buffer 之前和readpos 之后buffer 之后,以确保这两个变量位于不同的缓存行上,这也带来了一定的速度。

我需要的是大约 400 % 的加速。你有什么想法我可以使用填充等来实现这一点吗?

【问题讨论】:

  • “无锁是可能的,因为我只有一个消费者和一个生产者线程” - 如果消费者和生产者线程冲突会发生什么?
  • 忙等待中消耗了多少 CPU?
  • 忙等待???如果你有忙等待,你不会问如​​何让它更快。
  • 如果我错了,请纠正我,但如果您有多个 CPU,您可能需要在生产和消费之间设置内存屏障。这会减慢您的速度(可能很多),但没有它,消费者可能会在数据写入缓冲区之前看到写入索引移动,并检索无效数据。
  • “单机”是什么意思?线程在一个进程内,通常一个进程在一台且仅一台机器上运行。通常的问题是您有多少个 CPU 内核,因为 MT 错误通常只出现在多核场景中,其中工作实际上是并行完成的,并且存在内存缓存问题。

标签: c performance multithreading caching shared-memory


【解决方案1】:

这是我可以看到的一项优化:在consume() 中,您不需要连续获取b->readpos,因为调用consume() 的线程是唯一可以更新它的线程。因为它是volatile,编译器无法优化所有这些提取,所以你需要明确地这样做:

void * consume (buf_t *b) {
    int rp = b->readpos;
    while (rp == b->writepos); // nothing to consume. wait
    int next = (rp + 1) % BUF_SIZE;
    void * res = b->buffer[rp]; b->readpos = next;
    return res;
}

您还应该以至少一个缓存线的步幅逐步遍历缓冲区,否则您将在两个 CPU 之间获得缓存线乒乓球(因为一个 CPU 希望缓存线读取 b->buffer[n] 并且 16 次中有 15 次另一个使其无效以写入b->buffer[n+1])。例如:

#define STRIDE 16
#define STEPS 2048
#define BUF_SIZE (STRIDE * STEPS)

#define TO_INDEX(n) (STRIDE * (((n) + 1) % STEPS) + (((n) + 1) / STEPS))

void produce (buf_t *b, void * e) {
    unsigned wp = b->writepos;
    unsigned next = (wp + 1) % BUF_SIZE;
    while (b->readpos == next); // queue is full. wait
    b->buffer[TO_INDEX(wp)] = e; b->writepos = next;
}

void * consume (buf_t *b) {
    unsigned rp = b->readpos;
    while (rp == b->writepos); // nothing to consume. wait
    unsigned next = (rp + 1) % BUF_SIZE;
    void * res = b->buffer[TO_INDEX(rp)]; b->readpos = next;
    return res;
}

无论如何都值得一试。 (请注意,只要 STRIDESTEPS 是 2 的幂,TO_INDEX() 中看起来吓人的除法和模数就可以优化为移位和按位与,但前提是操作数是 unsigned - 因此我建议相应地更改这些类型)。

【讨论】:

    【解决方案2】:

    我假设您使用的机器具有多个处理器或内核。如果不是,那么你忙碌的等待会伤害到事情。如果您在一个操作系统下运行,该操作系统决定您睡眠不足并降低您的动态优先级并且还有其他程序正在运行,那么它们无论如何都可能受到伤害。

    您需要收集有关缓冲区已满的数据。在某个时候,太大也会开始损害您的缓存。

    如果您使用全局数组而不是从堆中分配它,那么指向缓冲区的指针将变为指针文字,并且两个线程都不必从缓存中的同一位置读取该指针值,因为它只会被推进入代码。

    如果吞吐量对您来说很重要(以延迟为代价)并且缓存确实发挥了重要作用,那么您可能会考虑让消费者落后于生产者,这样他们就不会尝试从同一个文件中读取和写入放入缓冲区。

    您可能希望更改消费者函数的接口,以便它可以在缓存大小(或多个)大小的块中消费(这对消费者滞后于我上面提出的生产者建议很有效)除了个人或部分缓存行块。尽量保持消费缓存对齐。如果您将可用数据视为一条蛇,那么头部和尾部可能未对齐。只有在没有其他数据可供使用时,才应使用未对齐的尾部。如果您可以在调用中使用任何其他数据来使用,那么您应该将尾部留给下一次调用。

    除此之外以及 caf 提到的内容,我不得不怀疑在这段代码之外发生的任何事情都必须发挥更大的作用。

    【讨论】:

      【解决方案3】:

      我已经在 cafs 答案的第一个代码块中实现了优化。他们实际上加快了速度(谢谢),但这还不够。第二个优化导致缓存按列而不是按行填充,导致性能更差。

      消费者落后于生产者的想法并没有带来任何加速。

      现在,我达到了 300%。

      我所做的另一项更改是对 volatile writepos 和 readpos 变量进行了一些修改:

      void produce (void * e) {
          unsigned int oldpos = buffer.writepos;
          unsigned int next = (oldpos+1) % BUF_SIZE;
          while (next == buffer.rpos) { // rpos is not volatile
              buffer.rpos = buffer.readpos;
              usleep(1);
          }
          buffer.buffer[oldpos] = e; buffer.writepos = next;
      }
      

      consume() 类似。

      对结构的额外更改会导致以下新缓冲区结构(在全局范围内,正如在一个答案中建议的那样,而不是在堆上)。

      #define STRIDE 16
      #define STEPS 524288
      
      struct buf_t {
          volatile unsigned int writepos;
          int offset [STRIDE - 1];
          unsigned int wpos;
          int offset2 [STRIDE - 1];
          volatile void * buffer[BUF_SIZE];
          int offset4 [STRIDE];
          volatile unsigned int readpos;
          int offset3 [STRIDE - 1];
          unsigned int rpos;
      }
      

      这提供了 300% 的加速,而这却是缺失的,并将其推到了我必须达到的性能限制之下。

      如果您有一些额外的技巧可以用来进一步提高性能,请不要犹豫也发布它们:-)

      感谢您的帮助。

      【讨论】:

        猜你喜欢
        • 2018-09-13
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2017-04-07
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2011-03-21
        相关资源
        最近更新 更多