【问题标题】:effective bounds checking on ring buffer环形缓冲区的有效边界检查
【发布时间】:2014-10-17 16:05:38
【问题描述】:

有些人可能已经注意到我正在尝试实现一个环形缓冲区。我希望在不损失太多效率的同时,在数据结构中有一定的安全措施。

我当前的解决方案实现了两种类型的计数器,一种是索引,它是缓冲区内存的直接偏移量,另一种是序列号,它只是一个size_t 类型的计数器。迭代器使用序列号来访问环形缓冲区。因此,环形缓冲区必须在每次缓冲区访问时从序列号转换为缓冲区索引。这通常是相当有效的:

size_t offset = seqNum - m_tailSeq;
size_t index = (m_tailIdx + offset) % m_size;

其中seqNum是要转换的序列号,m_tailSeq是缓冲区中最旧元素的序列号,m_tailIdx是缓冲区中最旧元素的缓冲区索引,m_size是缓冲内存的大小。

但是,如果我向缓冲区添加元素的时间足够长,序列号就会溢出。所以我必须检查一下。当我这样做时,我短暂而甜蜜的转变变成了这个怪物:

size_type getIndex(size_type seqNum) const
{
    size_type headSeq = m_tailSeq + m_numElements;

    // sequence does not wrap around
    if (m_tailSeq < headSeq)
    {
        // bounds check
        if(m_tailSeq <= seqNum && seqNum < headSeq) {
            size_type offset = seqNum - m_tailSeq;
            return (m_tailIdx + offset) % m_size;
        } else {
            throw BaseException("RingBuffer: access out of bounds", __FILE__, __LINE__);
        }
    }
    // sequence does wrap around
    else if (headSeq < m_tailSeq)
    {
        //bounds check (inverted from above)
        if(seqNum < headSeq) {
            size_type offset = (SIZE_TYPE_MAX - m_tailSeq) + seqNum;
            return (m_tailIdx + offset) % m_size;
        } else if (seqNum >= m_tailSeq) {
            size_type offset = seqNum - m_tailSeq;
            return (m_tailIdx + offset) % m_size;
        } else {
            throw BaseException("RingBuffer: access out of bounds", __FILE__, __LINE__);
        }
    }
    else if (isEmpty()) {
        throw BaseException("RingBufferIterator: accessing empty buffer", __FILE__, __LINE__);
    }
}

这相当于两个整数加法、一个整数减法、三个整数比较和一个模运算在最好的情况下在单个缓冲区访问。不用说,遍历缓冲区变得非常昂贵。但是,由于我想在高性能场景中使用这个缓冲区(即软实时应用程序中的事件队列),我希望这个数据结构尽可能高效。

当前用例将作为事件缓冲区。一个(或可能不止一个)系统会将事件写入缓冲区,而其他系统(不止一个)将按照自己的节奏处理这些事件而不删除它们。当缓冲区已满时,旧事件将被简单地覆盖。这样,我总是可以记录最后几百个事件,并且不同的系统可以以各自的更新速率检查它们并挑选出与它们相关的事件。不同的系统将保留一个指向环形缓冲区的迭代器,以便它们知道上次中断的位置和恢复的位置。当系统开始处理事件时,它需要确定它的迭代器是否仍然有效或是否已被覆盖。事件可能一次被大块处理,因此递增和取消引用应该很快。所以基本上我们在潜在的多线程上下文中查看 MPMC 环形缓冲区。

我自己能想出的唯一解决方案是将错误检查的负担转移到缓冲区的用户身上。 IE。用户必须首先检查(通过某种方式)其进入缓冲区的迭代器是否有效,确保缓冲区的某一段保持有效,然后在没有任何进一步检查的情况下迭代该段。但是,这似乎很容易出错,因为我必须检查程序多个部分的访问安全性,而不仅仅是一个地方,如果我决定让缓冲区线程安全,它会变得很麻烦。

我错过了什么吗?这可以做得更好吗?我犯了一些初学者的错误吗?

【问题讨论】:

  • 将您的边界检查置于定义下,根据调试模式是否可以启用/禁用。例如,只有 vector::at 保证边界检查。但是许多编译器在调试模式下边界检查 vector::operator[]。
  • 看看网络协议中的序列号。 C++ 中明确定义的无符号整数溢出。
  • 您可能不想使用异常。此外,分支指令可能比指令更昂贵。除法/模数也可能很昂贵。最好的建议可能是进行准确的基准测试。您可能想查看ring buffer 的开源实现,以了解实现它们的不同方法。
  • @Jason,我知道分支可能非常昂贵,但我还没有找到没有它们的方法。如果你有任何提示,我很乐意听到。我还查看了几个环形缓冲区实现,但没有查看您提到的,因为我将自己限制在 C++ 中。不过我会读的。我看过的大多数实现根本不做边界检查。我开始怀疑是否有一种有效的方法来实现它,或者我是否必须简单地相信用户会进行适当的边界检查......
  • @sseeland 除了禁止用户按设计索引之外,没有免费的方法来进行边界检查(我知道的大多数环形缓冲区都没有)。还有一些微优化可以避免除法/模数开销,但您应该首先担心使用异常。

标签: c++ optimization containers


【解决方案1】:

正如我在评论 unsigned integer overflow is well defined operation 中提到的。在 C++ 中实现高效的序列号是关键。所以我们可以简单地减去两个无符号整数来得到距离。然后只需将距离转发到通过边界检查通过索引实现访问的函数。与往常一样,它会在所有可能的索引低于序列号最大值的一半时工作。

#include <array>
#include <climits>
#include <iostream>

unsigned int const SEQUENCE_NUMBER_FIRST = UINT_MAX-10;

class RingBuffer
{
public:
    void PushBack( char c )
    {
        GetBySeqNumber(m_tailSeq++) = c;
        if( Size() == m_buffer.size()+1 )
            PopFront();
    }
    void PopFront()
    {
        ++m_headSeq;
        if( ++m_offset % m_buffer.size() == 0 )
            m_offset = 0;
    }
    char& GetByIndex( size_t n )
    {
        if( n >= Size() )
            throw std::out_of_range("Hello, world!");
        return m_buffer[ (n+m_offset) % m_buffer.size() ];
    }
    char& GetBySeqNumber( unsigned int n )
    {
        // It is well defined operation in C++,
        // but if you try to use signed integer
        // it will become undefined behavior
        return GetByIndex( n-m_headSeq );
    }
    size_t Size() const
    {
        return m_tailSeq - m_headSeq;
    }
private:
    size_t m_offset = 0;
    unsigned int m_headSeq = SEQUENCE_NUMBER_FIRST;
    unsigned int m_tailSeq = SEQUENCE_NUMBER_FIRST;
    std::array<char,26> m_buffer;
};

int main()
{
    // initialize
    RingBuffer buf;
    for( char i=0; i<26; ++i )
        buf.PushBack( 'a'+i );

    // access trough sequence numbers
    // add or subtract one to get out of range exception
    for( unsigned int i=0; i<buf.Size(); ++i )
        std::cout << buf.GetBySeqNumber( SEQUENCE_NUMBER_FIRST+i );
    std::cout << std::endl;

    // push some more to overwrite first 10 values
    for( char i=0; i<10; ++i )
        buf.PushBack( '0'+i );

    // access trough indexes
    // add or subtract one to get out of range exception
    for( size_t i=0; i<buf.Size(); ++i )
        std::cout << buf.GetByIndex(i);
    std::cout << std::endl;

    return 0;
}

【讨论】:

  • 恐怕我看不出这是如何工作的。据我所知,它仅在head_ 引用buffer_[0] 时才有效。但是环形缓冲区的想法是数据基本上可以从数组中的任何位置开始,并且只是环绕。如果你继续添加数据,数据的“开始”就会移动。那么您的代码如何处理有效数据从buffer_[5] 开始并回绕到buffer_[4] 的情况?
  • 你没有明白我的意思。当然,您需要通过对GetByIndex() 的范围检查插入正确的实现。所以我提供了完整版的环形缓冲区。
  • 哇,我花了很长时间才看到它,但现在我明白你的意思了!这非常有效!
猜你喜欢
  • 2014-12-09
  • 1970-01-01
  • 1970-01-01
  • 2016-12-09
  • 2012-04-04
  • 1970-01-01
  • 2018-10-29
  • 1970-01-01
相关资源
最近更新 更多