【问题标题】:efficient circular buffer?有效的循环缓冲区?
【发布时间】:2010-11-11 04:17:18
【问题描述】:

我想在 python 中创建一个高效的circular buffer(目标是取缓冲区中整数值的平均值)。

这是使用列表收集值的有效方法吗?

def add_to_buffer( self, num ):
    self.mylist.pop( 0 )
    self.mylist.append( num )

什么会更有效(为什么)?

【问题讨论】:

  • 这不是实现循环缓冲区的有效方法,因为 pop(0) 在列表中是 O(n) 操作。 pop(0) 删除列表中的第一个元素,所有元素都必须向左移动。请改用带有 maxlen 属性的 collections.deque。 deque 对于 append 和 pop 有 O(1) 操作。

标签: python circular-buffer


【解决方案1】:

我会使用 collections.dequemaxlen arg

>>> import collections
>>> d = collections.deque(maxlen=10)
>>> d
deque([], maxlen=10)
>>> for i in xrange(20):
...     d.append(i)
... 
>>> d
deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10)

deque 的文档中有一个recipe,与您想要的类似。我断言它是最有效的完全取决于这样一个事实,即它是由一个非常熟练的团队用 C 语言实现的,他们习惯于编写一流的代码。

【讨论】:

  • +1 是的,它包含了不错的电池。循环缓冲区的操作是 O(1),正如你所说的额外开销在 C 中,所以应该还是相当快
  • 我不喜欢这个解决方案,因为当定义 maxlen 时,文档不保证 O(1) 随机访问。当deque 可以增长到无穷大时,O(n) 是可以理解的,但如果给定maxlen,则索引元素应该是常数时间。
  • 我的猜测是它实现为链表而不是数组。
  • 如果我的answer below 中的时间安排正确的话,似乎是正确的。
  • 实际的 python 实现(见这里:github.com/python/cpython/blob/main/Modules/…)使用了两种想法的混合:固定大小块的链表。 IE。它在内部将双端队列分成大小相等的内存块。这将减轻纯链表实现的一些成本,但是长双端队列的内部项目的访问时间仍然是O(maxlen)(又名O(n)
【解决方案2】:

虽然这里已经有很多很好的答案,但我找不到所提到的选项的任何时间直接比较。因此,请在下面的比较中找到我的卑微尝试。

仅出于测试目的,该类可以在基于list 的缓冲区、基于collections.deque 的缓冲区和基于Numpy.roll 的缓冲区之间切换。

请注意,为简单起见,update 方法一次只添加一个值。

import numpy
import timeit
import collections


class CircularBuffer(object):
    buffer_methods = ('list', 'deque', 'roll')

    def __init__(self, buffer_size, buffer_method):
        self.content = None
        self.size = buffer_size
        self.method = buffer_method

    def update(self, scalar):
        if self.method == self.buffer_methods[0]:
            # Use list
            try:
                self.content.append(scalar)
                self.content.pop(0)
            except AttributeError:
                self.content = [0.] * self.size
        elif self.method == self.buffer_methods[1]:
            # Use collections.deque
            try:
                self.content.append(scalar)
            except AttributeError:
                self.content = collections.deque([0.] * self.size,
                                                 maxlen=self.size)
        elif self.method == self.buffer_methods[2]:
            # Use Numpy.roll
            try:
                self.content = numpy.roll(self.content, -1)
                self.content[-1] = scalar
            except IndexError:
                self.content = numpy.zeros(self.size, dtype=float)

# Testing and Timing
circular_buffer_size = 100
circular_buffers = [CircularBuffer(buffer_size=circular_buffer_size,
                                   buffer_method=method)
                    for method in CircularBuffer.buffer_methods]
timeit_iterations = 1e4
timeit_setup = 'from __main__ import circular_buffers'
timeit_results = []
for i, cb in enumerate(circular_buffers):
    # We add a convenient number of convenient values (see equality test below)
    code = '[circular_buffers[{}].update(float(j)) for j in range({})]'.format(
        i, circular_buffer_size)
    # Testing
    eval(code)
    buffer_content = [item for item in cb.content]
    assert buffer_content == range(circular_buffer_size)
    # Timing
    timeit_results.append(
        timeit.timeit(code, setup=timeit_setup, number=int(timeit_iterations)))
    print '{}: total {:.2f}s ({:.2f}ms per iteration)'.format(
        cb.method, timeit_results[-1],
        timeit_results[-1] / timeit_iterations * 1e3)

在我的系统上,这会产生:

list:  total 1.06s (0.11ms per iteration)
deque: total 0.87s (0.09ms per iteration)
roll:  total 6.27s (0.63ms per iteration)

【讨论】:

    【解决方案3】:

    从列表头部弹出会导致整个列表被复制,因此效率低下

    您应该使用固定大小的列表/数组和在添加/删除项目时移动通过缓冲区的索引

    【讨论】:

    • 同意。无论它看起来多么优雅或不雅,或使用何种语言。实际上,您越少打扰垃圾收集器(或堆管理器或分页/映射机制或任何真正的内存魔法)越好。
    • @RocketSurgeon 这不是魔法,只是它是一个删除第一个元素的数组。因此,对于大小为 n 的数组,这意味着 n-1 个复制操作。这里不涉及垃圾收集器或类似设备。
    • 我同意。这样做也比某些人想象的要容易得多。只需使用不断增加的计数器,并在访问项目时使用模运算符 (% arraylen)。
    • 同上,您可以查看我上面的帖子,我就是这样做的
    【解决方案4】:

    基于MoonCactus's answer,这里有一个circularlist 类。与他的版本不同的是,这里c[0] 总是给出最旧的附加元素,c[-1] 是最新附加的元素,c[-2] 倒数第二个......这对于应用程序来说更自然。

    c = circularlist(4)
    c.append(1); print(c, c[0], c[-1])    #[1] (1/4 items)              1  1
    c.append(2); print(c, c[0], c[-1])    #[1, 2] (2/4 items)           1  2
    c.append(3); print(c, c[0], c[-1])    #[1, 2, 3] (3/4 items)        1  3
    c.append(8); print(c, c[0], c[-1])    #[1, 2, 3, 8] (4/4 items)     1  8
    c.append(10); print(c, c[0], c[-1])   #[2, 3, 8, 10] (4/4 items)    2  10
    c.append(11); print(c, c[0], c[-1])   #[3, 8, 10, 11] (4/4 items)   3  11
    d = circularlist(4, [1, 2, 3, 4, 5])  #[2, 3, 4, 5]
    

    类:

    class circularlist(object):
        def __init__(self, size, data = []):
            """Initialization"""
            self.index = 0
            self.size = size
            self._data = list(data)[-size:]
    
        def append(self, value):
            """Append an element"""
            if len(self._data) == self.size:
                self._data[self.index] = value
            else:
                self._data.append(value)
            self.index = (self.index + 1) % self.size
    
        def __getitem__(self, key):
            """Get element by index, relative to the current index"""
            if len(self._data) == self.size:
                return(self._data[(key + self.index) % self.size])
            else:
                return(self._data[key])
    
        def __repr__(self):
            """Return string representation"""
            return (self._data[self.index:] + self._data[:self.index]).__repr__() + ' (' + str(len(self._data))+'/{} items)'.format(self.size)
    

    【讨论】:

    • 很好的补充。 Python 列表已经允许负索引,但是 (-1),例如一旦循环缓冲区已满,将不会返回预期值,因为列表中的“最后”添加最终会 列表中。
    • 它确实有效@MoonCactus,请参阅我在答案之上给出的 6 个示例;在最后一个中,您可以看到 c[-1] 始终是正确的元素。 __getitem__ 做得对。
    • 哦,是的,我的意思是 mine 失败了,不是你的,抱歉 :D 我会让我的评论更清楚! -- 哦,我不能,评论太旧了。
    • 不错的简单解决方案。我添加了一个可选参数以允许从现有数据初始化列表,这样更pythonpathetic。
    • 除了计算处理的项目数之外,还有没有办法只迭代这个循环列表的成员一次? for i in c: 永远迭代......谢谢 – Mike T.
    【解决方案5】:

    可以使用 deque 类,但对于问题的要求(平均),这是我的解决方案:

    >>> from collections import deque
    >>> class CircularBuffer(deque):
    ...     def __init__(self, size=0):
    ...             super(CircularBuffer, self).__init__(maxlen=size)
    ...     @property
    ...     def average(self):  # TODO: Make type check for integer or floats
    ...             return sum(self)/len(self)
    ...
    >>>
    >>> cb = CircularBuffer(size=10)
    >>> for i in range(20):
    ...     cb.append(i)
    ...     print "@%s, Average: %s" % (cb, cb.average)
    ...
    @deque([0], maxlen=10), Average: 0
    @deque([0, 1], maxlen=10), Average: 0
    @deque([0, 1, 2], maxlen=10), Average: 1
    @deque([0, 1, 2, 3], maxlen=10), Average: 1
    @deque([0, 1, 2, 3, 4], maxlen=10), Average: 2
    @deque([0, 1, 2, 3, 4, 5], maxlen=10), Average: 2
    @deque([0, 1, 2, 3, 4, 5, 6], maxlen=10), Average: 3
    @deque([0, 1, 2, 3, 4, 5, 6, 7], maxlen=10), Average: 3
    @deque([0, 1, 2, 3, 4, 5, 6, 7, 8], maxlen=10), Average: 4
    @deque([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], maxlen=10), Average: 4
    @deque([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], maxlen=10), Average: 5
    @deque([2, 3, 4, 5, 6, 7, 8, 9, 10, 11], maxlen=10), Average: 6
    @deque([3, 4, 5, 6, 7, 8, 9, 10, 11, 12], maxlen=10), Average: 7
    @deque([4, 5, 6, 7, 8, 9, 10, 11, 12, 13], maxlen=10), Average: 8
    @deque([5, 6, 7, 8, 9, 10, 11, 12, 13, 14], maxlen=10), Average: 9
    @deque([6, 7, 8, 9, 10, 11, 12, 13, 14, 15], maxlen=10), Average: 10
    @deque([7, 8, 9, 10, 11, 12, 13, 14, 15, 16], maxlen=10), Average: 11
    @deque([8, 9, 10, 11, 12, 13, 14, 15, 16, 17], maxlen=10), Average: 12
    @deque([9, 10, 11, 12, 13, 14, 15, 16, 17, 18], maxlen=10), Average: 13
    @deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10), Average: 14
    

    【讨论】:

    • 我在尝试调用average 方法时得到TypeError: 'numpy.float64' object is not callable
    • 是的......事实上我猜双端队列在内部使用了 numpy 数组(删除 @property 后它工作正常)
    • 我保证 deque 在内部不使用 numpy 数组。 collections 是标准库的一部分,numpy 不是。对第三方库的依赖会导致糟糕的标准库。
    【解决方案6】:

    Python 的双端队列很慢。您也可以使用 numpy.roll 代替 How do you rotate the numbers in an numpy array of shape (n,) or (n,1)?

    在这个基准测试中,双端队列为 448 毫秒。 Numpy.roll 为 29 毫秒 http://scimusing.wordpress.com/2013/10/25/ring-buffers-in-pythonnumpy/

    【讨论】:

    • 但是numpy.roll返回了一个数组的副本,对吧?
    • 这个答案非常具有误导性 - Python 的双端队列似乎非常快,但在您链接到的基准测试中,从和转换为 numpy 数组会大大减慢它。
    • -1 从source of numpy.roll 中可以看出,它计算旧数组的前半部分和后半部分的切片,创建一个新数组(使用empty_like),然后复制交换的部分到新的。这是 O(n),因为您总是复制整个数组。
    【解决方案7】:

    the solution from the Python Cookbook 怎么样,包括环形缓冲区实例满时的重新分类?

    class RingBuffer:
        """ class that implements a not-yet-full buffer """
        def __init__(self,size_max):
            self.max = size_max
            self.data = []
    
        class __Full:
            """ class that implements a full buffer """
            def append(self, x):
                """ Append an element overwriting the oldest one. """
                self.data[self.cur] = x
                self.cur = (self.cur+1) % self.max
            def get(self):
                """ return list of elements in correct order """
                return self.data[self.cur:]+self.data[:self.cur]
    
        def append(self,x):
            """append an element at the end of the buffer"""
            self.data.append(x)
            if len(self.data) == self.max:
                self.cur = 0
                # Permanently change self's class from non-full to full
                self.__class__ = self.__Full
    
        def get(self):
            """ Return a list of elements from the oldest to the newest. """
            return self.data
    
    # sample usage
    if __name__=='__main__':
        x=RingBuffer(5)
        x.append(1); x.append(2); x.append(3); x.append(4)
        print(x.__class__, x.get())
        x.append(5)
        print(x.__class__, x.get())
        x.append(6)
        print(x.data, x.get())
        x.append(7); x.append(8); x.append(9); x.append(10)
        print(x.data, x.get())
    

    在实现中值得注意的设计选择是,因为这些 对象在某个时刻经历不可逆的状态转换 它们的生命周期——从非完整缓冲区到完整缓冲区(以及行为 在那一点上发生变化)-我通过更改self.__class__来建模。 这甚至在 Python 2.2 中也有效,只要两个类具有相同的 插槽(例如,它适用于两个经典类,例如 RingBuffer 和 __Full 在这个秘籍中)。

    在许多语言中更改实例的类可能很奇怪, 但它是其他表示方式的 Pythonic 替代方案 偶尔的、大规模的、不可逆的和离散的状态变化 极大地影响行为,就像在这个食谱中一样。 Python的好东西 支持各种类。

    图片来源:Sébastien Keim

    【讨论】:

    • 我对这个与双端队列进行了一些速度测试。这比双端队列慢了大约 7 倍。
    • @PolyMesh 太棒了,你应该让作者知道!
    • 这样做有什么意义?这是一个旧的已发布文件。我的评论的重点是让其他人知道这个答案已经过时并使用 deque 代替。
    • @PolyMesh 发布时可能还慢;联系作者的说明在本书的介绍中。我只是在谈论一个可能的替代方案。此外,“如果只有速度是最好的指标;唉,它可能只是一个好指标。”
    • @d8aninja deque 也有一个 .clear() 函数。这不是(据我所知,不能)
    【解决方案8】:

    你也可以看到这个很老的Python recipe

    这是我自己的带有 NumPy 数组的版本:

    #!/usr/bin/env python
    
    import numpy as np
    
    class RingBuffer(object):
        def __init__(self, size_max, default_value=0.0, dtype=float):
            """initialization"""
            self.size_max = size_max
    
            self._data = np.empty(size_max, dtype=dtype)
            self._data.fill(default_value)
    
            self.size = 0
    
        def append(self, value):
            """append an element"""
            self._data = np.roll(self._data, 1)
            self._data[0] = value 
    
            self.size += 1
    
            if self.size == self.size_max:
                self.__class__  = RingBufferFull
    
        def get_all(self):
            """return a list of elements from the oldest to the newest"""
            return(self._data)
    
        def get_partial(self):
            return(self.get_all()[0:self.size])
    
        def __getitem__(self, key):
            """get element"""
            return(self._data[key])
    
        def __repr__(self):
            """return string representation"""
            s = self._data.__repr__()
            s = s + '\t' + str(self.size)
            s = s + '\t' + self.get_all()[::-1].__repr__()
            s = s + '\t' + self.get_partial()[::-1].__repr__()
            return(s)
    
    class RingBufferFull(RingBuffer):
        def append(self, value):
            """append an element when buffer is full"""
            self._data = np.roll(self._data, 1)
            self._data[0] = value
    

    【讨论】:

    • +1 用于使用 numpy,但 -1 用于不实现循环缓冲区。您实现它的方式是,每次添加单个元素时都会移动所有数据,这会花费O(n) 时间。要实现正确的circular buffer,您应该同时拥有一个索引和一个大小变量,并且您需要正确处理数据“环绕”缓冲区末端的情况。检索数据时,您可能必须在缓冲区的开头和结尾连接两个部分。
    【解决方案9】:

    来自 Github:

    class CircularBuffer:
    
        def __init__(self, size):
            """Store buffer in given storage."""
            self.buffer = [None]*size
            self.low = 0
            self.high = 0
            self.size = size
            self.count = 0
    
        def isEmpty(self):
            """Determines if buffer is empty."""
            return self.count == 0
    
        def isFull(self):
            """Determines if buffer is full."""
            return self.count == self.size
    
        def __len__(self):
            """Returns number of elements in buffer."""
            return self.count
    
        def add(self, value):
            """Adds value to buffer, overwrite as needed."""
            if self.isFull():
                self.low = (self.low+1) % self.size
            else:
                self.count += 1
            self.buffer[self.high] = value
            self.high = (self.high + 1) % self.size
    
        def remove(self):
            """Removes oldest value from non-empty buffer."""
            if self.count == 0:
                raise Exception ("Circular Buffer is empty");
            value = self.buffer[self.low]
            self.low = (self.low + 1) % self.size
            self.count -= 1
            return value
    
        def __iter__(self):
            """Return elements in the circular buffer in order using iterator."""
            idx = self.low
            num = self.count
            while num > 0:
                yield self.buffer[idx]
                idx = (idx + 1) % self.size
                num -= 1
    
        def __repr__(self):
            """String representation of circular buffer."""
            if self.isEmpty():
                return 'cb:[]'
    
            return 'cb:[' + ','.join(map(str,self)) + ']'
    

    https://github.com/heineman/python-data-structures/blob/master/2.%20Ubiquitous%20Lists/circBuffer.py

    【讨论】:

      【解决方案10】:

      我在进行串行编程之前遇到过这个问题。一年多前的那个时候,我也找不到任何有效的实现,所以我最终写了one as a C extension,它也可以在 MIT 许可下使用on pypi。它是超级基本的,只处理 8 位有符号字符的缓冲区,但长度灵活,所以如果你需要字符以外的东西,你可以使用 Struct 或其他东西。我现在通过谷歌搜索看到这些天有几个选项,所以你可能也想看看这些。

      【讨论】:

        【解决方案11】:

        这个不需要任何库。它会增长一个列表,然后按索引在其中循环。

        占用空间非常小(没有库),它的运行速度至少是 dequeue 的两倍。这确实有助于计算移动平均值,但请注意,这些项目不会像上面那样按年龄排序。

        class CircularBuffer(object):
            def __init__(self, size):
                """initialization"""
                self.index= 0
                self.size= size
                self._data = []
        
            def record(self, value):
                """append an element"""
                if len(self._data) == self.size:
                    self._data[self.index]= value
                else:
                    self._data.append(value)
                self.index= (self.index + 1) % self.size
        
            def __getitem__(self, key):
                """get element by index like a regular array"""
                return(self._data[key])
        
            def __repr__(self):
                """return string representation"""
                return self._data.__repr__() + ' (' + str(len(self._data))+' items)'
        
            def get_all(self):
                """return a list of all the elements"""
                return(self._data)
        

        取平均值,例如:

        q= CircularBuffer(1000000);
        for i in range(40000):
            q.record(i);
        print "capacity=", q.size
        print "stored=", len(q.get_all())
        print "average=", sum(q.get_all()) / len(q.get_all())
        

        结果:

        capacity= 1000000
        stored= 40000
        average= 19999
        
        real 0m0.024s
        user 0m0.020s
        sys  0m0.000s
        

        这大约是 dequeue 等效时间的 1/3。

        【讨论】:

        • 你的__getitem__不应该更强大一点吗:self._data[(key + self._index + 1) % self._size]
        • 为什么要移动 +1 ?现在,是的,请参阅下面的 Basj 变体了解这个想法
        • 你的计时码有缺陷。在将 max_size 设置为 1000000 后调用 append 40000 次,所以你只测试底层列表的 append 方法。
        • 你是对的;但我几乎看不到它发生时它会变得比出队慢(附加比替换慢 - 由于可能的内存重新分配)。由于上面的@basj 增强了我的代码(他自然会获得更多的学分),所以我把它留给其他人花更多时间在这上面并检查它;)
        【解决方案12】:

        最初的问题是:“高效”循环缓冲区。 根据所要求的效率,来自 aaronasterling 的答案似乎是绝对正确的。 使用用 Python 编写的专用类并将时间处理与 collections.deque 进行比较显示,使用 deque 可实现 5.2 倍的加速! 这是一个非常简单的代码来测试这个:

        class cb:
            def __init__(self, size):
                self.b = [0]*size
                self.i = 0
                self.sz = size
            def append(self, v):
                self.b[self.i] = v
                self.i = (self.i + 1) % self.sz
        
        b = cb(1000)
        for i in range(10000):
            b.append(i)
        # called 200 times, this lasts 1.097 second on my laptop
        
        from collections import deque
        b = deque( [], 1000 )
        for i in range(10000):
            b.append(i)
        # called 200 times, this lasts 0.211 second on my laptop
        

        要将双端队列转换为列表,只需使用:

        my_list = [v for v in my_deque]
        

        然后您将获得对双端队列项的 O(1) 随机访问。当然,这仅在设置一次后需要对双端队列进行多次随机访问时才有价值。

        【讨论】:

          【解决方案13】:

          这将相同的原则应用于一些旨在保存最新文本消息的缓冲区。

          import time
          import datetime
          import sys, getopt
          
          class textbffr(object):
              def __init__(self, size_max):
                  #initialization
                  self.posn_max = size_max-1
                  self._data = [""]*(size_max)
                  self.posn = self.posn_max
          
              def append(self, value):
                  #append an element
                  if self.posn == self.posn_max:
                      self.posn = 0
                      self._data[self.posn] = value   
                  else:
                      self.posn += 1
                      self._data[self.posn] = value
          
              def __getitem__(self, key):
                  #return stored element
                  if (key + self.posn+1) > self.posn_max:
                      return(self._data[key - (self.posn_max-self.posn)])
                  else:
                      return(self._data[key + self.posn+1])
          
          
          def print_bffr(bffr,bffer_max): 
              for ind in range(0,bffer_max):
                  stored = bffr[ind]
                  if stored != "":
                      print(stored)
              print ( '\n' )
          
          def make_time_text(time_value):
              return(str(time_value.month).zfill(2) + str(time_value.day).zfill(2)
                + str(time_value.hour).zfill(2) +  str(time_value.minute).zfill(2)
                + str(time_value.second).zfill(2))
          
          
          def main(argv):
              #Set things up 
              starttime = datetime.datetime.now()
              log_max = 5
              status_max = 7
              log_bffr = textbffr(log_max)
              status_bffr = textbffr(status_max)
              scan_count = 1
          
              #Main Loop
              # every 10 secounds write a line with the time and the scan count.
              while True: 
          
                  time_text = make_time_text(datetime.datetime.now())
                  #create next messages and store in buffers
                  status_bffr.append(str(scan_count).zfill(6) + " :  Status is just fine at : " + time_text)
                  log_bffr.append(str(scan_count).zfill(6) + " : " + time_text + " : Logging Text ")
          
                  #print whole buffers so far
                  print_bffr(log_bffr,log_max)
                  print_bffr(status_bffr,status_max)
          
                  time.sleep(2)
                  scan_count += 1 
          
          if __name__ == '__main__':
              main(sys.argv[1:])  
          

          【讨论】:

            【解决方案14】:

            我在这里没有得到答案。显然,如果您在 NumPy 中工作,您通常希望对数组或 ndarray 进行子类化,这样(至少在循环数组已满时)您仍然可以在循环数组上使用 NumPy 数组算术运算。唯一需要注意的是,对于跨越多个组件的操作(例如移动平均线),您的窗口不会大于缓冲区中累积的值。

            此外,正如所有评论者所提到的,不要使用滚动,因为这违背了效率的目的。如果您需要一个不断增长的数组,只需在每次需要调整大小时将其大小加倍(这与循环数组实现不同)。

            【讨论】:

              猜你喜欢
              • 2013-09-26
              • 1970-01-01
              • 1970-01-01
              • 2019-06-08
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              相关资源
              最近更新 更多