【问题标题】:Arnaud Legoux Moving Average and numpyArnaud Legoux 移动平均线和 numpy
【发布时间】:2018-04-09 22:45:08
【问题描述】:

我想编写使用 NumPy(或 Pandas)计算 Arnaud Legoux 移动平均线的矢量版本的代码。你能帮我解决这个问题吗?谢谢。

非向量版本如下所示(见下文)。

def NPALMA(pnp_array, **kwargs) :
    '''
    ALMA - Arnaud Legoux Moving Average,
    http://www.financial-hacker.com/trend-delusion-or-reality/
    https://github.com/darwinsys/Trading_Strategies/blob/master/ML/Features.py
    '''
    length = kwargs['length']
    # just some number (6.0 is useful)
    sigma = kwargs['sigma']
    # sensisitivity (close to 1) or smoothness (close to 0)
    offset = kwargs['offset']

    asize = length - 1
    m = offset * asize
    s = length  / sigma
    dss = 2 * s * s

    alma = np.zeros(pnp_array.shape)
    wtd_sum = np.zeros(pnp_array.shape)

    for l in range(len(pnp_array)):
        if l >= asize:
            for i in range(length):
                im = i - m
                wtd = np.exp( -(im * im) / dss)
                alma[l] += pnp_array[l - length + i] * wtd
                wtd_sum[l] += wtd
            alma[l] = alma[l] / wtd_sum[l]

    return alma

【问题讨论】:

    标签: python pandas numpy vectorization


    【解决方案1】:

    开始方法

    我们可以沿第一个轴创建滑动窗口,然后使用张量乘法与wtd 值的范围进行求和。

    实现看起来像这样 -

    # Get all wtd values in an array
    wtds = np.exp(-(np.arange(length) - m)**2/dss)
    
    # Get the sliding windows for input array along first axis
    pnp_array3D = strided_axis0(pnp_array,len(wtds))
    
    # Initialize o/p array
    out = np.zeros(pnp_array.shape)
    
    # Get sum-reductions for the windows which don't need wrapping over
    out[length:] = np.tensordot(pnp_array3D,wtds,axes=((1),(0)))[:-1]
    
    # Last element of the output needed wrapping. So, do it separately.
    out[length-1] = wtds.dot(pnp_array[np.r_[-1,range(length-1)]])
    
    # Finally perform the divisions
    out /= wtds.sum()
    

    获取滑动窗口的函数:strided_axis0 来自here

    使用1D卷积提升

    那些与wtds 值的乘法以及它们的减和基本上是沿第一个轴的卷积。因此,我们可以使用scipy.ndimage.convolve1daxis=0。考虑到内存效率,这会快得多,因为我们不会创建巨大的滑动窗口。

    实现将是 -

    from scipy.ndimage import convolve1d as conv
    
    avgs = conv(pnp_array, weights=wtds/wtds.sum(),axis=0, mode='wrap')
    

    因此,非零行 out[length-1:] 将与 avgs[:-length+1] 相同。

    如果我们使用来自wtds 的非常小的内核数,可能会有一些精度差异。因此,如果使用此 convolution 方法,请记住这一点。

    运行时测试

    方法-

    def original_app(pnp_array, length, m, dss):
        alma = np.zeros(pnp_array.shape)
        wtd_sum = np.zeros(pnp_array.shape)
    
        for l in range(len(pnp_array)):
            if l >= asize:
                for i in range(length):
                    im = i - m
                    wtd = np.exp( -(im * im) / dss)
                    alma[l] += pnp_array[l - length + i] * wtd
                    wtd_sum[l] += wtd
                alma[l] = alma[l] / wtd_sum[l]
        return alma
    
    def vectorized_app1(pnp_array, length, m, dss):
        wtds = np.exp(-(np.arange(length) - m)**2/dss)
        pnp_array3D = strided_axis0(pnp_array,len(wtds))
        out = np.zeros(pnp_array.shape)
        out[length:] = np.tensordot(pnp_array3D,wtds,axes=((1),(0)))[:-1]
        out[length-1] = wtds.dot(pnp_array[np.r_[-1,range(length-1)]])
        out /= wtds.sum()
        return out
    
    def vectorized_app2(pnp_array, length, m, dss):
        wtds = np.exp(-(np.arange(length) - m)**2/dss)
        return conv(pnp_array, weights=wtds/wtds.sum(),axis=0, mode='wrap')
    

    时间安排 -

    In [470]: np.random.seed(0)
         ...: m,n = 1000,100
         ...: pnp_array = np.random.rand(m,n)
         ...: 
         ...: length = 6
         ...: sigma = 0.3
         ...: offset = 0.5
         ...: 
         ...: asize = length - 1
         ...: m = np.floor(offset * asize)
         ...: s = length  / sigma
         ...: dss = 2 * s * s
         ...: 
    
    In [471]: %timeit original_app(pnp_array, length, m, dss)
         ...: %timeit vectorized_app1(pnp_array, length, m, dss)
         ...: %timeit vectorized_app2(pnp_array, length, m, dss)
         ...: 
    10 loops, best of 3: 36.1 ms per loop
    1000 loops, best of 3: 1.84 ms per loop
    1000 loops, best of 3: 684 µs per loop
    
    In [472]: np.random.seed(0)
         ...: m,n = 10000,1000 # rest same as previous one
    
    In [473]: %timeit original_app(pnp_array, length, m, dss)
         ...: %timeit vectorized_app1(pnp_array, length, m, dss)
         ...: %timeit vectorized_app2(pnp_array, length, m, dss)
         ...: 
    1 loop, best of 3: 503 ms per loop
    1 loop, best of 3: 222 ms per loop
    10 loops, best of 3: 106 ms per loop
    

    【讨论】:

    • 我用一维卷积检查了你的第二种方法。它看起来有什么问题。但我无法得到究竟是什么。我的例子:pnp_array = np.array([3924.00752506,5774.30335369,5519.40734814,4931.71344059])Offset = 0.85 sigma = 6长度= 3 m = 1.7 DSS = 0.5预期结果应[0,0,5594.1703042,5115.59420056]。但是第二个应用程序返回 [ 0 , 0 , 5693.3358598 , 5333.61073335]。所以累积误差是-317.182084168。这是因为您提到的内核数量少吗?
    • @Prokhozhii 该集合的wtdswtds = np.exp(-(np.arange(length) - m)**2/dss) 的值是多少?
    • 它们在第二个应用程序中是正确的:array([ 0.00308872, 0.3753111 , 0.83527021])
    • @Prokhozhii 我知道这些是正确的。我要求他们的价值观来了解他们如何与pnp_array 中的价值观进行比较。如果这些要小得多,那么是的,请使用 app#1。好的,所以它们看起来非常小,所以是的,请在这种情况下使用 app#1。这与我谈到的精度问题相同。
    • 据我了解,app#1 的索引有错误。它返回 [0, 0, 5199.97996566, 5594.17030842]。但是第 4 个元素应该是第 3 个元素,而第 5 个元素(不存在)应该是第 4 个元素。请问你能帮忙吗?
    猜你喜欢
    • 2014-05-24
    • 1970-01-01
    • 2017-09-01
    • 2022-01-26
    • 2013-12-22
    • 1970-01-01
    • 2011-06-29
    • 2019-04-24
    相关资源
    最近更新 更多