【问题标题】:Ring buffer, 1 writer and N readers环形缓冲区,1 个写入器和 N 个读取器
【发布时间】:2015-01-29 10:53:27
【问题描述】:

我需要一种环形缓冲区(或者一些更合适的数据结构,以防万一)和一种算法/模式来在以下情况下处理环形缓冲区。

连续产生实时数据的 1 个写入器必须始终能够写入第一个空闲“槽”(一个不在读取过程中的槽),或者等到一个槽有空闲用于写入.每当写入器完成将数据写入一个槽时,它就可以为读取器“提交”该槽。

在给定时间,可能有 N 个并发读者。在发出读取请求时,每个读取器都应该始终从环形缓冲区中最后提交的插槽中获取最新写入的数据,但不能多次读取相同的数据。如果自上次读取后写入器尚未在一个槽中写入和提交新数据,则读取器应该等待(想想快速读取器)。

请注意,一位读者不得“消费”另一位读者的数据。换言之,两个不同的读取器可能读取相同的数据。同样,一个读取器可以从同一个槽读取数据两次或更多次,但前提是写入器在两次读取请求之间写入该槽。

请注意,干扰器可能不足以满足我的情况(或者我只是未能使其按我想要的方式工作)。破坏者的问题是写入器可能会前进得如此之快(与较慢的读取器相比),它可能最终会在某些插槽正在被读取时覆盖它们。在这种情况下,写入者应该能够跳过这些“忙碌”的插槽,直到找到第一个空闲插槽(一旦写入,它还必须仅发布该插槽),但破坏者模式不会似乎在考虑这种情况。序列本身还有另一个问题,在我使用的中断器实现中,它是一个原子整数,因此它可能会溢出导致一些未定义的行为。

你有什么想法吗?如果你知道的话,我会很感激现代 C++ 的解决方案。

【问题讨论】:

    标签: multithreading algorithm c++11 design-patterns concurrency


    【解决方案1】:

    我不知道这个问题的开箱即用解决方案(注意:这并不意味着没有一个!)但是如果我尝试使用 c++11 工具来实现它,我会看到类似以下的内容:

    • 保存数据的数组
    • 一个 std::stack 用于保存已写入元素的索引
    • 用于保存已读取元素的索引的 std::queue
    • 用于控制对堆栈的访问的互斥锁。因此,读取器和写入器在弹出/推送到堆栈之前先获取互斥锁。
    • 用于控制对队列的访问的互斥体。因此,读取器和写入器在入队/出队到队列之前获取互斥锁。
    • 当堆栈为空时供读取器阻塞的读取器条件变量
    • 当队列为空时供写入器阻塞的写入器条件变量。

    • 开始将所有索引(环形缓冲区的大小)排入队列。

    • 写入器获取索引并写入数组,然后将索引压入堆栈并通知读取器。
    • 读取器从堆栈中弹出索引,完成后将它们排入队列并通知写入器。
    • 空堆栈导致读取器等待写入器,而空队列导致写入器等待读取器。

    A nice article on the C++11 concurrency features.

    编辑进一步思考这个建议试图做的是将数组中的插槽视为资源池。读取器获取资源、槽的索引,并在完成处理后将其返回。

    更新的解决方案:由于插槽数量限制为最多 8 个,我建议您使解决方案尽可能简单。让每个插槽负责自己的状态:

    #include <atomic>
    template <typename T>
    class Slot {
       atomic_uint readers ;
       int iteration ;
       T data ;
    }
    

    然后创建 Slots 的 std:vector 并让读取器和写入器遍历该向量。

    编写器需要找到readers == 0 的插槽(并且可能具有最小值iteration),当它找到时,它需要减少读取器计数,然后检查读取器计数是否为-1,以确保读者没有在 if 和递减之间开始阅读。如果确实如此,则重新增加 readers 并重新开始。

    Readers 遍历数组以寻找他们尚未读取的最大 iteration 值以及 readers &gt;= 0 的位置。然后他们重新检查 readers 的值,如果 Writer 没有将其设置为 -1,他们会增加它并开始读取。如果 Writer 已开始写入该插槽,则 Reader 将重新开始。

    假设 Writer 已将插槽标识为空闲并且 Reader 已决定需要读取该插槽,有 2 个可能的命令,并且在 Reader 和 Writer 中都退回并重试。

    Execution       Writer                  Reader
    1               readers == 0 
    2                                       readers >= 0
    3               readers--
    4                                       readers++
    5               readers == -1 is false
    6                                       readers > 0 is false!
    7               readers++
    8                                       readers--
    
    
    Execution       Writer                  Reader
    1                                       readers >= 0
    2               readers == 0 
    3                                       readers++
    4               readers--
    5                                       
    6               readers == -1 is false
    7                                       readers > 0 is false
    8               readers++
    9                                       readers--
    

    如果 a Reader 和 Writer 确实以这种方式发生冲突,则两者都无法访问并且都需要重试。

    如果您对这种方法不满意,那么可以使用 Mutex 和 lock/try_lock,在 Slot 类中类似这样的东西(注意:目前无法通过编译器运行它,因此可能存在语法错误)

    typedef enum LOCK_TYPE {READ, WRITE} ;
    std::mutex mtx ;
    
    bool lockSlot(LOCK_TYPE lockType)
    {
        bool result = false ;
        if (mtx.try_lock()) {
    
            if ((lockType == READ) && (readers >= 0)) 
                readers++ ;
            if ((lockType == WRITE) && (readers == 0)) 
                readers-- ;
            result = true ;
        }
        return result ;
    }
    
    void unlockSlot (LOCK_TYPE lockType)
    {
        if (mtx.lock()) {  // Wait for a lock this time
    
            if (lockType == READ) 
                readers-- ;
            if (lockType == WRITE)
                readers++ ;
        }
    }
    

    当/如果阅读器的工作用完,它可以等待一个条件变量,当有新数据可用时,编写器可以使用该条件变量来 NotifyAll() 阅读器。

    【讨论】:

    • 它不会像我想要的那样工作:“读者从堆栈中弹出索引,完成后将它们排入队列并通知作者。”。许多读者可以共享同一个索引。正如我所说,一位读者不能“窃取”(通过弹出)一个索引给后续读者(如果尚未将新索引推送到堆栈中)。
    • @Martin - 我不太确定你的意思。您有固定数量的插槽,而编写器必须等待空闲插槽,那么插槽如何变得空闲?是否所有读者都需要读取插槽中的数据,还是有其他标准?是否允许读者同时或顺序阅读数据或两者兼而有之?我认为你需要更详细地解释你需要什么。
    • 一个给定的读取器不能读取相同的数据两次。但是给定两个阅读器 A 和 B,假设 A 先阅读一些输入。使用您的解决方案,它会从读者队列中弹出。然后 B 要求数据并且无法读取 A 刚刚读取的内容。您需要一些东西来跟踪读者(他们已经阅读了什么?)
    • @Rerito :你明白我的意思。这就是为什么问题比看起来更困难的原因。我想知道为什么对于这个问题显然没有开箱即用的解决方案,1 个写入器(实时)N 个读取器总是获取最新的可能数据,以及写入器不能覆盖正在读取的内容的唯一约束。 (也不能复制数据进行“隔离阅读”,因为复制本身会很广泛)
    • @Jackson 除了 Rerito 所说的,是的,正如我所说,读数可能是并发的。
    【解决方案2】:

    几年前我也遇到过同样的问题(如果我理解正确的话),我有一个解决方案。一篇包含算法的期刊论文已被接受,我会在它发表时在此处提供参考,但现在,我可以将您引导至代码。

    然而,首先,这是在 C 中实时完成的,并且(当前)可能使用 POSIX 后端(在用户空间中)或 RTAI 后端(在用户空间或内核空间中)。

    其次,我的一个要求是作者永远不会因为用户而被阻止。也就是说,如果写入器找不到空闲缓冲区,它会覆盖当前数据帧。我也解决了这个问题,我的测试表明,只要在读者方面进行少量协作,就可以完全避免这种情况。

    还要注意,这个想法类似于“周期性数据缓冲区”1 和“循环异步缓冲区”2,尽管我是自己提出的。他们使用原子操作,而我使用读写锁。此外,它们没有提供我上面提到的“交换跳过”的解决方案。

    最后,作者和读者不仅仅是周期性的。编写者可以是定期的或零星的(根据请求运行),而读者可以是定期的、零星的或尽力而为。

    您可以找到编写器实现here 和读取器实现here

    为了完整起见,我将使用多缓冲区模式下周期性写入器和读取器的基本算法粘贴注释:

    Writer:
    
    write_lock(cur)
    loop {
             if last period no swap
                     try swap again
             write
             try while period left
                     write_lock(next)
                     write_unlock(cur)
                     cur = next
             wait period
    }
    unlock(cur)
    
    Reader:
    
    loop {
            if last is new
                    b = last
            else
                    b = cur
            read_lock(b)
            read
            read_unlock(b)
            wait period
    }
    

    write 上,写入器还会为缓冲区中的数据加上时间戳。这有助于读者了解数据是新的还是旧的。

    关于 writer 中的缓冲区交换的要点是,它会尝试,直到成功或接近其交换缓冲区的周期结束。这是通过try_locking 所有其他缓冲区来完成的,以查看哪些可用(在第一次成功/期末停止)(当然没有忙循环)。完成此操作后,它还会告知(通过共享内存,可能与包含数据缓冲区的内存相同)何时希望再次交换缓冲区,以及哪个缓冲区包含最新数据(last),以及写入者当前控制哪个缓冲区 (cur)。

    然后读者可以选择。如果last 包含新数据,read_lock 它并继续读取。如果不是,或者如果阅读器可以估计它无法及时执行下一次缓冲区交换的读取,它会等待cur 解锁。一旦写入器解锁,实时调度器就会尽快唤醒读取器。如果系统负载不重,这意味着几乎是瞬间的。

    为了让读者知道它是否可以及时执行读取,它会跟踪它在周期中的平均执行时间,并使用它来检查是否current_time + my_execution_time &gt; next_expected_swap。在这种情况下,它无法及时执行读取,因此它将改为等待即将解锁的cur


    1 “HIC:一种用于伺服回路层次结构的操作系统”,Clark, D.,机器人与自动化,1989 年。Proceedings.,1989 年 IEEE 国际会议

    2“HARTIK:机器人应用程序的实时内核”,Buttazzo, G.C.,实时系统研讨会,1993 年,论文集。


    注意:虽然这个库主要是为机器人皮肤数据处理而设计的,但您基本上可以将它用于具有上述属性的任何一般的读写器通信。此外,请注意我指向的存储库的主分支包含旧版本。我指向的分支skinware-2 有一个更好且功能更丰富的实现。

    【讨论】:

    • 谢谢,我需要一些时间来分析您的解决方案。我会尽快通知你。
    猜你喜欢
    • 1970-01-01
    • 2015-04-10
    • 2019-06-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-12-12
    • 1970-01-01
    • 2019-02-28
    相关资源
    最近更新 更多