【问题标题】:calculate exponential moving average in python在python中计算指数移动平均线
【发布时间】:2010-10-04 01:43:45
【问题描述】:

我有一个日期范围和每个日期的测量值。我想计算每个日期的指数移动平均值。有人知道怎么做吗?

我是 python 新手。标准python库中似乎没有内置平均值,这让我觉得有点奇怪。也许我没找对地方。

那么,给定以下代码,我如何计算日历日期的 IQ 点的移动加权平均值?

from datetime import date
days = [date(2008,1,1), date(2008,1,2), date(2008,1,7)]
IQ = [110, 105, 90]

(可能有更好的方法来构建数据,任何建议将不胜感激)

【问题讨论】:

    标签: python signal-processing average digital-filter


    【解决方案1】:

    编辑: 似乎来自scikits.timeseries.lib.moving_funcs 子模块的mov_average_expw() 函数来自SciKits (补充SciPy 的附加工具包)更适合您问题的措辞。


    要使用平滑因子 alpha(在维基百科的术语中为 (1 - alpha))计算您的数据的 exponential smoothing

    >>> alpha = 0.5
    >>> assert 0 < alpha <= 1.0
    >>> av = sum(alpha**n.days * iq 
    ...     for n, iq in map(lambda (day, iq), today=max(days): (today-day, iq), 
    ...         sorted(zip(days, IQ), key=lambda p: p[0], reverse=True)))
    95.0
    

    上面的不太好看,我们稍微重构一下吧:

    from collections import namedtuple
    from operator    import itemgetter
    
    def smooth(iq_data, alpha=1, today=None):
        """Perform exponential smoothing with factor `alpha`.
    
        Time period is a day.
        Each time period the value of `iq` drops `alpha` times.
        The most recent data is the most valuable one.
        """
        assert 0 < alpha <= 1
    
        if alpha == 1: # no smoothing
            return sum(map(itemgetter(1), iq_data))
    
        if today is None:
            today = max(map(itemgetter(0), iq_data))
    
        return sum(alpha**((today - date).days) * iq for date, iq in iq_data)
    
    IQData = namedtuple("IQData", "date iq")
    
    if __name__ == "__main__":
        from datetime import date
    
        days = [date(2008,1,1), date(2008,1,2), date(2008,1,7)]
        IQ = [110, 105, 90]
        iqdata = list(map(IQData, days, IQ))
        print("\n".join(map(str, iqdata)))
    
        print(smooth(iqdata, alpha=0.5))
    

    例子:

    $ python26 smooth.py
    IQData(date=datetime.date(2008, 1, 1), iq=110)
    IQData(date=datetime.date(2008, 1, 2), iq=105)
    IQData(date=datetime.date(2008, 1, 7), iq=90)
    95.0
    

    【讨论】:

    • 嗨 J.F. Sebastian,我想使用这个 EWMA 公式在我的网站上显示趋势。我在 SO — stackoverflow.com/questions/9283856 上发布了一个问题。有人为此建议了 EWMA 算法,因为我需要更多地强调最近的项目而不是旧的项目。由于我没有统计经验,我对如何计算α 的值感到有些困惑。有什么帮助吗?谢谢。
    • 链接页面已失效,能否更新一下?
    • @sebix:随意编辑。如果谷歌没有帮助,请尝试wayback machine
    • 什么是平滑因子?
    • @KshitijAgrawal:点击答案中的the "exponential smoothing" link
    【解决方案2】:

    我用谷歌搜索了一下,发现了以下示例代码 (http://osdir.com/ml/python.matplotlib.general/2005-04/msg00044.html):

    def ema(s, n):
        """
        returns an n period exponential moving average for
        the time series s
    
        s is a list ordered from oldest (index 0) to most
        recent (index -1)
        n is an integer
    
        returns a numeric array of the exponential
        moving average
        """
        s = array(s)
        ema = []
        j = 1
    
        #get n sma first and calculate the next n period ema
        sma = sum(s[:n]) / n
        multiplier = 2 / float(1 + n)
        ema.append(sma)
    
        #EMA(current) = ( (Price(current) - EMA(prev) ) x Multiplier) + EMA(prev)
        ema.append(( (s[n] - sma) * multiplier) + sma)
    
        #now calculate the rest of the values
        for i in s[n+1:]:
            tmp = ( (i - ema[j]) * multiplier) + ema[j]
            j = j + 1
            ema.append(tmp)
    
        return ema
    

    【讨论】:

    • 为什么函数使用与函数同名的局部变量?除了使代码的可读性稍差之外,它还可能引入难以检测到的逻辑错误……
    • s = array(s) 的意义何在?在我将其注释掉之前,我遇到了语法错误。
    • @chjortlund 我不确定您所说的“列表中的第二个项目将是 SMA”是什么意思。当前 EMA 值基于前一个值,但您必须从某个地方开始,因此将 SMA 作为设置的初始值。这是计算 EMA 的正确方法。
    • @Zuku 真的,我已经删除了我的评论。回到我做的时候,我正在寻找一种算法来处理实时传入的数据,而上面的 sn-p 不适合那个用例(也没有被宣传为)——我的错误!
    【解决方案3】:

    我总是用 Pandas 计算 EMA:

    下面是一个例子:

    import pandas as pd
    import numpy as np
    
    def ema(values, period):
        values = np.array(values)
        return pd.ewma(values, span=period)[-1]
    
    values = [9, 5, 10, 16, 5]
    period = 5
    
    print ema(values, period)
    

    更多关于 Pandas EWMA 的信息:

    http://pandas.pydata.org/pandas-docs/stable/generated/pandas.ewma.html

    【讨论】:

    • 新版本的 Pandas 不是有新的更好的functions吗?
    • s.ewm(span = 2/alpha-1).mean() 其中s 是一个系列
    • @user3226167 你如何使 alpha = y ?
    • @luky alpha 表示smoothing factor。你的意思是如何从numpy数组创建ss = pd.Series(y)
    • @user3226167 不,我认为“alpha”是变量 X,但后来我发现方程已经包含在函数中,只是改变了静态 alpha 参数
    【解决方案4】:

    您也可以使用 SciPy 过滤器方法,因为 EMA 是 IIR 过滤器。与 enumerate() 方法相比,在我的系统上使用 timeit 在大型数据集上测得的速度大约提高了 64 倍。

    import numpy as np
    from scipy.signal import lfilter
    
    x = np.random.normal(size=1234)
    alpha = .1 # smoothing coefficient
    zi = [x[0]] # seed the filter state with first value
    # filter can process blocks of continuous data if <zi> is maintained
    y, zi = lfilter([1.-alpha], [1., -alpha], x, zi=zi)
    

    【讨论】:

      【解决方案5】:

      我不了解 Python,但是对于平均部分,您是指形式呈指数衰减的低通滤波器

      y_new = y_old + (input - y_old)*alpha
      

      其中 alpha = dt/tau,dt = 滤波器的时间步长,tau = 滤波器的时间常数? (这个变时间步的形式如下,只要把dt/tau剪成不大于1.0)

      y_new = y_old + (input - y_old)*dt/tau
      

      如果要过滤日期等内容,请确保转换为浮点数,例如自 1970 年 1 月 1 日以来的秒数。

      【讨论】:

        【解决方案6】:

        我的 python 有点生疏(如果我以某种方式弄乱了语法,任何人都可以随意编辑此代码以进行更正),但是这里......

        def movingAverageExponential(values, alpha, epsilon = 0):
        
           if not 0 < alpha < 1:
              raise ValueError("out of range, alpha='%s'" % alpha)
        
           if not 0 <= epsilon < alpha:
              raise ValueError("out of range, epsilon='%s'" % epsilon)
        
           result = [None] * len(values)
        
           for i in range(len(result)):
               currentWeight = 1.0
        
               numerator     = 0
               denominator   = 0
               for value in values[i::-1]:
                   numerator     += value * currentWeight
                   denominator   += currentWeight
        
                   currentWeight *= alpha
                   if currentWeight < epsilon: 
                      break
        
               result[i] = numerator / denominator
        
           return result
        

        此函数向后移动,从列表的末尾到开头,通过向后计算每个值的指数移动平均值,直到元素的权重系数小于给定的 epsilon。

        在函数结束时,它会在返回列表之前反转值(这样它们对于调用者来说是正确的顺序)。

        (旁注:如果我使用的不是python语言,我会先创建一个全尺寸的空数组,然后按倒序填充它,这样我就不必在最后反转它。但我不认为你可以在 python 中声明一个大的空数组。在 python 列表中,追加比前置要便宜得多,这就是我以相反顺序构建列表的原因。如果我错了,请纠正我。)

        “alpha”参数是每次迭代的衰减因子。例如,如果您使用 0.5 的 alpha,那么今天的移动平均值将由以下加权值组成:

        today:        1.0
        yesterday:    0.5
        2 days ago:   0.25
        3 days ago:   0.125
        ...etc...
        

        当然,如果您有大量的值,那么十天或十五天前的值对今天的加权平均值贡献不大。 'epsilon' 参数可让您设置一个截止点,低于该截止点​​您将不再关心旧值(因为它们对今天的价值的贡献将是微不足道的)。

        你会像这样调用函数:

        result = movingAverageExponential(values, 0.75, 0.0001)
        

        【讨论】:

        • 当非连续数据以非统一的时间间隔可用时,您如何将其应用于非连续数据,例如问题中的一个:今天、5 天前、6 天前?
        • 语法大部分是正确的,除了:'||' -> 'or', '&&' -> 'and', 'list.length' -> 'len(list)', if, while 附近的括号是不必要的。您可以在 Python 中创建一个列表的副本:result = values[:] 或创建一个大的“空”列表:result = [None]*len(values)
        • 条件可以写成如下: if not 0
        • 当(alpha==1 或 epsilon==0)时,您的算法是二次的。 M=log(epsilon)/log(alpha) 可能是一个很大的因素(如果 len(values) 很大,则执行内部循环的时间),所以我不会担心values.reverse()——它只是一个传递数据。
        • 有些算法允许一次性计算 AWME(请参阅 @earino 的答案中的 ema() 和我的 mov_average_expw()
        【解决方案7】:

        在 matplotlib.org 示例 (http://matplotlib.org/examples/pylab_examples/finance_work2.html) 中提供了一个使用 numpy 的指数移动平均 (EMA) 函数的好示例:

        def moving_average(x, n, type):
            x = np.asarray(x)
            if type=='simple':
                weights = np.ones(n)
            else:
                weights = np.exp(np.linspace(-1., 0., n))
        
            weights /= weights.sum()
        
            a =  np.convolve(x, weights, mode='full')[:len(x)]
            a[:n] = a[n]
            return a
        

        【讨论】:

          【解决方案8】:

          我发现@earino 的上述代码 sn-p 非常有用 - 但我需要一些可以持续平滑值流的东西 - 所以我将它重构为:

          def exponential_moving_average(period=1000):
              """ Exponential moving average. Smooths the values in v over ther period. Send in values - at first it'll return a simple average, but as soon as it's gahtered 'period' values, it'll start to use the Exponential Moving Averge to smooth the values.
              period: int - how many values to smooth over (default=100). """
              multiplier = 2 / float(1 + period)
              cum_temp = yield None  # We are being primed
          
              # Start by just returning the simple average until we have enough data.
              for i in xrange(1, period + 1):
                  cum_temp += yield cum_temp / float(i)
          
              # Grab the timple avergae
              ema = cum_temp / period
          
              # and start calculating the exponentially smoothed average
              while True:
                  ema = (((yield ema) - ema) * multiplier) + ema
          

          我是这样使用它的:

          def temp_monitor(pin):
              """ Read from the temperature monitor - and smooth the value out. The sensor is noisy, so we use exponential smoothing. """
              ema = exponential_moving_average()
              next(ema)  # Prime the generator
          
              while True:
                  yield ema.send(val_to_temp(pin.read()))
          

          (其中 pin.read() 产生我想要使用的下一个值)。

          【讨论】:

            【解决方案9】:

            这是我根据http://stockcharts.com/school/doku.php?id=chart_school:technical_indicators:moving_averages 制作的一个简单示例

            请注意,与他们的电子表格不同,我不计算 SMA,也不会等待 10 个样本后生成 EMA。这意味着我的值略有不同,但如果您绘制它,它会在 10 个样本之后完全遵循。在前 10 个样本中,我计算的 EMA 被适当地平滑了。

            def emaWeight(numSamples):
                return 2 / float(numSamples + 1)
            
            def ema(close, prevEma, numSamples):
                return ((close-prevEma) * emaWeight(numSamples) ) + prevEma
            
            samples = [
            22.27, 22.19, 22.08, 22.17, 22.18, 22.13, 22.23, 22.43, 22.24, 22.29,
            22.15, 22.39, 22.38, 22.61, 23.36, 24.05, 23.75, 23.83, 23.95, 23.63,
            23.82, 23.87, 23.65, 23.19, 23.10, 23.33, 22.68, 23.10, 22.40, 22.17,
            ]
            emaCap = 10
            e=samples[0]
            for s in range(len(samples)):
                numSamples = emaCap if s > emaCap else s
                e =  ema(samples[s], e, numSamples)
                print e
            

            【讨论】:

              【解决方案10】:

              可能最短:

              #Specify decay in terms of span
              #data_series should be a DataFrame
              
              ema=data_series.ewm(span=5, adjust=False).mean()
              
              

              【讨论】:

                【解决方案11】:

                一种快速的方法(从here 复制粘贴)如下:

                def ExpMovingAverage(values, window):
                    """ Numpy implementation of EMA
                    """
                    weights = np.exp(np.linspace(-1., 0., window))
                    weights /= weights.sum()
                    a =  np.convolve(values, weights, mode='full')[:len(values)]
                    a[:window] = a[window]
                    return a
                

                【讨论】:

                • 如果你用 from scipy import signal 替换 np.convolve 会更快,a = signal.convolve(values, weights, mode='full') [:len(values)]
                【解决方案12】:

                我使用列表和衰减率作为输入。考虑到深度递归在 python 中并不稳定,我希望这个只有两行代码的小函数可以帮助到你。

                def expma(aseries, ratio):
                    return sum([ratio*aseries[-x-1]*((1-ratio)**x) for x in range(len(aseries))])
                

                【讨论】:

                  【解决方案13】:

                  更简单,使用 pandas

                  def EMA(tw):
                      for x in tw:
                          data["EMA{}".format(x)] = data['close'].ewm(span=x, adjust=False).mean()
                          EMA([10,50,100])
                  

                  【讨论】:

                    【解决方案14】:

                    Papahaba 的答案是几乎我正在寻找的(谢谢!),但我需要匹配初始条件。使用带有scipy.signal.lfilter 的 IIR 滤波器肯定是最有效的。这是我的还原:

                    给定一个 NumPy 向量,x

                    import numpy as np
                    from scipy import signal
                    
                    period = 12
                    b = np.array((1,), 'd')
                    a = np.array((period, 1-period), 'd')
                    zi = signal.lfilter_zi(b, a)
                    y, zi = signal.lfilter(b, a, x, zi=zi*x[0:1])
                    

                    获取向量y中返回的N点EMA(这里是12)

                    【讨论】:

                      【解决方案15】:

                      我在这里参加聚会有点晚了,但给出的解决方案都不是我想要的。使用递归和投资百科中给出的确切公式是一个不错的小挑战。 不需要 numpy 或 pandas。

                      prices = [{'i': 1, 'close': 24.5}, {'i': 2, 'close': 24.6}, {'i': 3, 'close': 24.8}, {'i': 4, 'close': 24.9},
                                {'i': 5, 'close': 25.6}, {'i': 6, 'close': 25.0}, {'i': 7, 'close': 24.7}]
                      
                      
                      def rec_calculate_ema(n):
                          k = 2 / (n + 1)
                          price = prices[n]['close']
                          if n == 1:
                              return price
                          res = (price * k) + (rec_calculate_ema(n - 1) * (1 - k))
                          return res
                      
                      
                      print(rec_calculate_ema(3))
                      

                      【讨论】:

                        猜你喜欢
                        • 2021-04-09
                        • 1970-01-01
                        • 1970-01-01
                        • 2021-05-30
                        • 2023-01-30
                        • 2023-04-07
                        • 1970-01-01
                        • 1970-01-01
                        相关资源
                        最近更新 更多