【问题标题】:Statistical accumulator in PythonPython中的统计累加器
【发布时间】:2010-09-22 23:15:55
【问题描述】:

统计累加器允许执行增量计算。例如,为了计算在任意时间给定的数字流的算术平均值,可以创建一个对象来跟踪给定项目的当前数量n 及其总和sum。当请求均值时,对象只返回sum/n

这样的累加器允许您进行增量计算,即当给定一个新数字时,您无需重新计算整个总和和计数。

可以为其他统计数据编写类似的累加器(参见boost library,用于 C++ 实现)。

您将如何在 Python 中实现累加器? The code I came up with 是:

class Accumulator(object):
    """
    Used to accumulate the arithmetic mean of a stream of
    numbers. This implementation does not allow to remove items
    already accumulated, but it could easily be modified to do
    so. also, other statistics could be accumulated.
    """
    def __init__(self):
     # upon initialization, the numnber of items currently
     # accumulated (_n) and the total sum of the items acumulated
     # (_sum) are set to zero because nothing has been accumulated
     # yet.
     self._n = 0
     self._sum = 0.0

    def add(self, item):
     # the 'add' is used to add an item to this accumulator
     try:
        # try to convert the item to a float. If you are
        # successful, add the float to the current sum and
        # increase the number of accumulated items
        self._sum += float(item)
        self._n += 1
     except ValueError:
        # if you fail to convert the item to a float, simply
        # ignore the exception (pass on it and do nothing)
        pass

    @property
    def mean(self):
     # the property 'mean' returns the current mean accumulated in
     # the object
     if self._n > 0:
        # if you have more than zero items accumulated, then return
        # their artithmetic average
        return self._sum / self._n
     else:
        # if you have no items accumulated, return None (you could
        # also raise an exception)
        return None

# using the object:

# Create an instance of the object "Accumulator"
my_accumulator = Accumulator()
print my_accumulator.mean
# prints None because there are no items accumulated

# add one (a number)
my_accumulator.add(1)
print my_accumulator.mean
# prints 1.0

# add two (a string - it will be converted to a float)
my_accumulator.add('2')
print my_accumulator.mean
# prints 1.5

# add a 'NA' (will be ignored because it cannot be converted to float)
my_accumulator.add('NA')
print my_accumulator.mean
# prints 1.5 (notice that it ignored the 'NA')

有趣的设计问题出现了:

  1. 如何制作蓄能器 线程安全?
  2. 如何安全删除 项目?
  3. 如何以某种方式构建架构 允许其他统计数据 轻松插入(统计工厂)

【问题讨论】:

  • python 的累加器库的状态是什么?我想将它与 PyTables 一起使用。

标签: python oop statistics accumulator


【解决方案1】:

对于一个通用的、线程安全的高级函数,您可以将以下内容与Queue.Queue 类和其他一些位结合使用:

from Queue import Empty

def Accumulator(f, q, storage):
    """Yields successive values of `f` over the accumulation of `q`.

    `f` should take a single iterable as its parameter.

    `q` is a Queue.Queue or derivative.

    `storage` is a persistent sequence that provides an `append` method.
    `collections.deque` may be particularly useful, but a `list` is quite acceptable.

    >>> from Queue import Queue
    >>> from collections import deque
    >>> from threading import Thread
    >>> def mean(it):
    ...     vals = tuple(it)
    ...     return sum(it) / len(it)
    >>> value_queue = Queue()
    >>> LastThreeAverage = Accumulator(mean, value_queue, deque((), 3))
    >>> def add_to_queue(it, queue):
    ...     for value in it:
    ...         value_queue.put(value)
    >>> putting_thread = Thread(target=add_to_queue,
    ...                         args=(range(0, 12, 2), value_queue))
    >>> putting_thread.start()
    >>> list(LastThreeAverage)
    [0, 1, 2, 4, 6, 8]
    """
    try:
        while True:
            storage.append(q.get(timeout=0.1))
            q.task_done()
            yield f(storage)
    except Empty:
        pass

此生成器函数通过将其委托给其他实体来逃避其声称的大部分责任:

  • 它依赖Queue.Queue 以线程安全的方式提供其源元素
  • collections.deque 对象可以作为storage 参数的值传入;这提供了一种仅使用最后一个 n(在本例中为 3)值的便捷方式
  • 函数本身(在本例中为mean)作为参数传递。在某些情况下,这会导致代码效率不高,但很容易应用于各种情况。

请注意,如果您的生产者线程每个值花费的时间超过 0.1 秒,则累加器可能会超时。这很容易通过传递更长的超时时间或完全删除超时参数来解决。在后一种情况下,函数将在队列末尾无限期地阻塞;这种用法在子线程(通常是daemon 线程)中使用的情况下更有意义。当然,您也可以将传递给q.get 的参数参数化为Accumulator 的第四个参数。

如果您想从生产者线程(此处为putting_thread)传达队列结束,即没有更多的值,您可以传递并检查哨兵值或使用其他方法。 this thread 有更多信息;我选择编写一个名为 CloseableQueue 的 Queue.Queue 子类,它提供了一个 close 方法。

您可以通过多种其他方式自定义此类函数的行为,例如通过限制队列大小;这只是一个使用示例。

编辑

如上所述,由于需要重新计算,这会降低一些效率,而且我认为这并不能真正回答您的问题。

生成器函数也可以通过其send 方法接受值。因此,您可以编写一个均值生成器函数,例如

def meangen():
    """Yields the accumulated mean of sent values.

    >>> g = meangen()
    >>> g.send(None) # Initialize the generator
    >>> g.send(4)
    4.0
    >>> g.send(10)
    7.0
    >>> g.send(-2)
    4.0
    """
    sum = yield(None)
    count = 1
    while True:
        sum += yield(sum / float(count))
        count += 1

这里的 yield 表达式既将值(send 的参数)带入函数,同时将计算值作为 send 的返回值传递出去。

您可以将调用该函数返回的生成器传递给更优化的累加器生成器函数,例如:

def EfficientAccumulator(g, q):
    """Similar to Accumulator but sends values to a generator `g`.

    >>> from Queue import Queue
    >>> from threading import Thread
    >>> value_queue = Queue()
    >>> g = meangen()
    >>> g.send(None)
    >>> mean_accumulator = EfficientAccumulator(g, value_queue)
    >>> def add_to_queue(it, queue):
    ...     for value in it:
    ...         value_queue.put(value)
    >>> putting_thread = Thread(target=add_to_queue,
    ...                         args=(range(0, 12, 2), value_queue))
    >>> putting_thread.start()
    >>> list(mean_accumulator)
    [0.0, 1.0, 2.0, 3.0, 4.0, 5.0]
    """
    try:
        while True:
            yield(g.send(q.get(timeout=0.1)))
            q.task_done()
    except Empty:
        pass

【讨论】:

    【解决方案2】:

    如果我在 Python 中这样做,有两件事我会做不同的事情:

    1. 分离出每个累加器的功能。
    2. 不要以任何方式使用@property。

    对于第一个,我可能想提出一个用于执行累积的 API,可能类似于:

    def add(self, num) # add a number
    def compute(self) # compute the value of the accumulator
    

    然后我会创建一个 AccumulatorRegistry 来保存这些累加器,并允许用户调用操作并添加到所有这些累加器。代码可能如下所示:

    class Accumulators(object):
        _accumulator_library = {}
    
        def __init__(self):
            self.accumulator_library = {}
            for key, value in Accumulators._accumulator_library.items():
                self.accumulator_library[key] = value()
    
        @staticmethod
        def register(name, accumulator):
            Accumulators._accumulator_library[name] = accumulator
    
        def add(self, num):
            for accumulator in self.accumulator_library.values():
                accumulator.add(num)
    
        def compute(self, name):
            self.accumulator_library[name].compute()
    
        @staticmethod
        def register_decorator(name):
            def _inner(cls):
                Accumulators.register(name, cls)
                return cls
    
    
    @Accumulators.register_decorator("Mean")
    class Mean(object):
        def __init__(self):
            self.total = 0
            self.count = 0
    
        def add(self, num):
            self.count += 1
            self.total += num
    
        def compute(self):
            return self.total / float(self.count)
    

    我可能应该谈谈您的线程安全问题。 Python 的 GIL 可以保护您免受许多线程问题的影响。不过,您可以采取一些措施来保护自己:

    • 如果这些对象被本地化到一个线程,请使用 threading.local
    • 如果没有,您可以将操作包装在锁中,使用 with 上下文语法为您处理持有锁。

    【讨论】:

    • 关于register 的好主意 - 我同意我的初始示例过于实验性。我期待着更多的cmets。谢谢。
    • 我还应该注意,我正在使用 python 2.6+ 功能;)
    猜你喜欢
    • 1970-01-01
    • 2019-05-24
    • 2017-01-12
    • 2020-02-18
    • 1970-01-01
    • 1970-01-01
    • 2021-10-13
    • 2023-03-23
    • 2021-01-13
    相关资源
    最近更新 更多