【问题标题】:smoothing irregularly sampled time data平滑不规则采样的时间数据
【发布时间】:2010-11-04 15:42:55
【问题描述】:

给定一个表格,其中第一列是某个参考点之后的秒数,第二列是任意测量值:

6   0.738158581
21  0.801697222
39  1.797224596
49  2.77920469
54  2.839757536
79  3.832232283
91  4.676794376
97  5.18244704
100 5.521878863
118 6.316630137
131 6.778507504
147 7.020395216
157 7.331607129
176 7.637492223
202 7.848079136
223 7.989456499
251 8.76853608
278 9.092367123 
    ...

如您所见,测量是在不规则的时间点采样的。我需要通过平均每次测量前 100 秒的读数(在 Python 中)来平滑数据。由于数据表很大,因此确实首选基于迭代器的方法。 不幸的是,经过两个小时的编码,我无法找到高效而优雅的解决方案。

谁能帮帮我?

编辑s

  1. 我希望每个原始读数都有一个平滑读数,并且平滑读数是原始读数和前 100 (delta) 秒内任何其他读数的算术平均值。 (约翰,你是对的)

  2. 巨大的 ~ 1e6 - 10e6 行 + 需要使用紧张的 RAM

  3. 数据近似随机游走

  4. 数据已排序

分辨率

我已经测试过 J Machin 和 yairchu 提出的解决方案。他们都给出了相同的结果,但是,在我的数据集上,J Machin 的版本呈指数增长,而 yairchu 的版本是线性的。以下是由 IPython 的 %timeit 测量的执行时间(以微秒为单位):

data size   J Machin    yairchu
10        90.2        55.6
50          930         258
100         3080        514
500         64700       2660
1000        253000      5390
2000        952000      11500

感谢大家的帮助。

【问题讨论】:

  • 是否太大而无法在 numpy 数组中处理?你有几样东西?
  • 这是线性插值来查找 100 的倍数的点吗?
  • 如果您有平滑要求,请详细说明。我尝试了几次,但我无法解析您的描述:“我需要通过在每次测量前 100 秒内平均读数来平滑数据”。
  • 请发布更多关于您的基准的信息。 AFAICT 这种行为(比指数更二次方!)只有在时间值没有上升或很小的正数时才会发生,以至于窗口包括迄今为止的所有或大部分读数。我的基准测试以大约 60% 的 Y 速度与已发布的代码实现线性结果,如果放弃 sum() 以支持对总数和计数的增量调整,我的速度将翻倍至 Y 速度的 120%。注意:答案与 14 sig dec 数字相同。
  • 继续:我使用 max(0.1, random.normalvariate(mu=16.0, sigma=7.81)) 生成随机正时间间隔; 0.1 是为了避免任何负面影响,而 16.0000000 :-) 和 7.81 是从您的样本中安装的。最初我得到二次行为,直到我注意到我把最小值而不是最大值!顺便说一句,正如你所说的随机游走,我使用了 new_reading = old_reading + random.random()。

标签: python datetime data-mining smoothing


【解决方案1】:

这样的事情怎么样,继续存储值直到与上次的时间差> 100,平均并产生这样的值 例如

def getAvgValues(data):
    lastTime = 0
    prevValues = []
    avgSampleTime=100

    for t, v in data:
        if t - lastTime < avgSampleTime:
            prevValues.append(v)
        else:
            avgV = sum(prevValues)/len(prevValues)
            lastTime = t
            prevValues = [v]
            yield (t,avgV)

for v in getAvgValues(data):
    print v

【讨论】:

  • 他要求所有原始测量时间的前 100 秒的平均值。你给他的例子只有 2 个结果
  • 嗯我误解了,任何方式看起来你修改它以获得正确的解决方案
  • 我并没有真正修改它。我刚刚使用了你的变量名。
【解决方案2】:

您还没有确切说明您想要输出的时间。我假设您希望每个原始读数都有一个平滑读数,并且平滑读数是原始读数和前 100(增量)秒内任何其他读数的算术平均值。

简答:使用 collections.deque ...它的读数永远不会超过“delta”秒。按照我的设置方式,您可以将 deque 视为一个列表,并轻松计算平均值或一些花哨的 gizmoid,这些 gizmoid 对最近的读数具有更大的权重。

长答案:

>>> the_data = [tuple(map(float, x.split())) for x in """\
... 6       0.738158581
... 21      0.801697222
[snip]
... 251     8.76853608
... 278     9.092367123""".splitlines()]
>>> import collections
>>> delta = 100.0
>>> q = collections.deque()
>>> for t, v in the_data:
...     while q and q[0][0] <= t - delta:
...         # jettison outdated readings
...         _unused = q.popleft()
...     q.append((t, v))
...     count = len(q)
...     print t, sum(item[1] for item in q) / count, count
...
...
6.0 0.738158581 1
21.0 0.7699279015 2
39.0 1.112360133 3
49.0 1.52907127225 4
54.0 1.791208525 5
79.0 2.13137915133 6
91.0 2.49500989771 7
97.0 2.8309395405 8
100.0 3.12993279856 9
118.0 3.74976297144 9
131.0 4.41385300278 9
147.0 4.99420529389 9
157.0 5.8325615685 8
176.0 6.033109419 9
202.0 7.15545189083 6
223.0 7.4342562845 6
251.0 7.9150342134 5
278.0 8.4246097095 4
>>>

编辑

一站式商店:在这里获取您喜欢的小工具。代码如下:

numerator = sum(item[1] * upsilon ** (t - item[0]) for item in q)
denominator = sum(upsilon ** (t - item[0]) for item in q)
gizmoid = numerator / denominator

其中 upsilon 应该略小于 1.0(

【讨论】:

  • 在我看来,这里可以使用常规列表,使用 .pop(0) 而不是 .popleft()。 collections.deque 有什么优势?
  • 弹出一个 Python 列表的左边是 O(N);弹出双端队列的左侧是 O(1)
【解决方案3】:

您的数据似乎大致呈线性:

Plot of your data http://rix0r.nl/~rix0r/share/shot-20090621.144851.gif

您在寻找什么样的平滑?一条线与该数据集的最小二乘拟合?某种低通滤波器?还是别的什么?

请告诉我们申请情况,以便我们为您提供更好的建议。

编辑:例如,根据应用程序,在第一个点和最后一个点之间插入一条线可能足以满足您的目的。

【讨论】:

    【解决方案4】:

    我正在使用求和结果,我将在其中添加新成员并减去旧成员。然而,这样一来,人们可能会遭受累积的浮点不准确性。

    因此,我实现了一个带有列表的“双端队列”。每当我的双端队列重新分配到更小的大小时。我在同一场合重新计算总和。

    我还在计算直到点 x 的平均值,包括点 x,所以至少有一个样本点需要平均。

    def getAvgValues(data, avgSampleTime):
      lastTime = 0
      prevValsBuf = []
      prevValsStart = 0
      tot = 0
      for t, v in data:
        avgStart = t - avgSampleTime
        # remove too old values
        while prevValsStart < len(prevValsBuf):
          pt, pv = prevValsBuf[prevValsStart]
          if pt > avgStart:
            break
          tot -= pv
          prevValsStart += 1
        # add new item
        tot += v
        prevValsBuf.append((t, v))
        # yield result
        numItems = len(prevValsBuf) - prevValsStart
        yield (t, tot / numItems)
        # clean prevVals if it's time
        if prevValsStart * 2 > len(prevValsBuf):
          prevValsBuf = prevValsBuf[prevValsStart:]
          prevValsStart = 0
          # recalculate tot for not accumulating float precision error
          tot = sum(v for (t, v) in prevValsBuf)
    

    【讨论】:

    • (1) 这是一个非常有趣的双端队列实现。 (2) 我怀疑 OP 是否非常担心浮点舍入误差的累积;与平滑的严重变化相比,它们肯定是非常小的削减......但如果他是,可以考虑使用 Kahan 加法器来维持运行总数。
    • 请注意,与指数移动平均线 (stackoverflow.com/questions/1023860/…) 相比,这是非常计算密集型的。除非您特别需要确保时间范围内的所有样本都具有同等贡献,而较旧的样本根本没有贡献,否则我会选择效率更高的 EMA。
    • @Curt Sampson:OP 特别要求这样做
    【解决方案5】:

    这个使它线性化:

    def process_data(datafile):
        previous_n = 0
        previous_t = 0
        for line in datafile:
            t, number = line.strip().split()
            t = int(t)
            number = float(number)
            delta_n = number - previous_n
            delta_t = t - previous_t
            n_per_t = delta_n / delta_t
            for t0 in xrange(delta_t):
                yield previous_t + t0, previous_n + (n_per_t * t0)
            previous_n = n
            previous_t = t
    
    f = open('datafile.dat')
    
    for sample in process_data(f):
        print sample
    

    【讨论】:

    • (1) .strip() 是多余的。 (2) 您似乎忘记每次都更新 previous_* (3) 即便如此,这意味着什么并不明显......它似乎会在先前读数和当前读数之间进行线性插值,每隔一秒 - 对 OP 要求的有趣解释。 (3) 我想你的意思是for t0 in xrange(1, delta_t + 1)
    【解决方案6】:

    听起来您需要一个简单的舍入公式。将任意数字四舍五入到任意区间:

    轮次(次数/间隔)*间隔

    您可以用地板或天花板代替圆形来代替“导致”或“因为”影响。它可以用任何语言工作,包括 SQL。

    【讨论】:

      【解决方案7】:

      O(1) 内存,以防您可以多次迭代输入 - 您可以对“左”使用一个迭代器,对“右”使用一个迭代器。

      def getAvgValues(makeIter, avgSampleTime):
        leftIter = makeIter()
        leftT, leftV = leftIter.next()
        tot = 0
        count = 0
        for rightT, rightV in makeIter():
          tot += rightV
          count += 1
          while leftT <= rightT - avgSampleTime:
            tot -= leftV
            count -= 1
            leftT, leftV = leftIter.next()
          yield rightT, tot / count
      

      【讨论】:

      • 我猜 OP 想要实时显示平滑值......想想重症监护病房的心跳监测器。
      【解决方案8】:

      虽然它给出了一个指数衰减的平均值,而不是一个总平均值,但我认为您可能想要我所说的 exponential moving average with varying alpha,它实际上是一个单极低通滤波器。这个问题现在有了一个解决方案,它的运行时间与数据点的数量成线性关系。看看它是否适合你。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2015-04-10
        • 2011-04-21
        • 2017-04-27
        • 1970-01-01
        • 2021-05-25
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多