【问题标题】:Spark Exponential Moving Average火花指数移动平均线
【发布时间】:2018-06-05 13:24:17
【问题描述】:

我有一个时间序列定价数据的数据框,其中包含 ID、日期和价格。

我需要计算价格列的指数移动平均线,并将其作为新列添加到数据框中。

我之前一直在使用 Spark 的窗口函数,它看起来很适合这个用例,但给出了 EMA 的公式:

EMA: {Price - EMA(previous day)} x multiplier + EMA(previous day)

在哪里

multiplier = (2 / (Time periods + 1)) //let's assume Time period is 10 days for now

我对如何访问列中先前的计算值感到有点困惑,而实际上是对列进行窗口化。 使用简单的移动平均线很简单,因为您只需计算一个新列,同时对窗口中的元素进行平均:

var window = Window.partitionBy("ID").orderBy("Date").rowsBetween(-windowSize, Window.currentRow)
dataFrame.withColumn(avg(col("Price")).over(window).alias("SMA"))

但似乎 EMA 有点复杂,因为在每一步我都需要之前的计算值。

我也查看了Weighted moving average in Pyspark,但我需要一种适用于 Spark/Scala 的方法,以及 10 或 30 天的 EMA。

有什么想法吗?

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:

    最后,我分析了指数移动平均线是如何在 pandas 数据帧中实现的。除了我上面描述的递归公式,它在任何 sql 或窗口函数中都难以实现(因为它是递归的),还有另一个,在their issue tracker 上有详细说明:

    y[t] = (x[t] + (1-a)*x[t-1] + (1-a)^2*x[t-2] + ... + (1-a)^n*x[t-n]) /
           ((1-a)^0 + (1-a)^1 + (1-a)^2 + ... + (1-a)^n).
    

    鉴于此,在 here 的额外 spark 实现帮助下,我最终完成了以下实现,这与执行 pandas_dataframe.ewm(span=window_size).mean() 大致相同 .

    def exponentialMovingAverage(partitionColumn: String, orderColumn: String, column: String, windowSize: Int): DataFrame = {
      val window = Window.partitionBy(partitionColumn)
      val exponentialMovingAveragePrefix = "_EMA_"
    
      val emaUDF = udf((rowNumber: Int, columnPartitionValues: Seq[Double]) => {
        val alpha = 2.0 / (windowSize + 1)
        val adjustedWeights = (0 until rowNumber + 1).foldLeft(new Array[Double](rowNumber + 1)) { (accumulator, index) =>
          accumulator(index) = pow(1 - alpha, rowNumber - index); accumulator
        }
        (adjustedWeights, columnPartitionValues.slice(0, rowNumber + 1)).zipped.map(_ * _).sum / adjustedWeights.sum
      })
      dataFrame.withColumn("row_nr", row_number().over(window.orderBy(orderColumn)) - lit(1))
        .withColumn(s"$column$exponentialMovingAveragePrefix$windowSize", emaUDF(col("row_nr"), collect_list(column).over(window)))
        .drop("row_nr")
    }
    

    (我假设我需要计算指数移动平均值的列的类型是 Double。)

    我希望这对其他人有所帮助。

    【讨论】:

    • 你能帮我用 scala 调用这个函数吗?因为我得到一些与错误相关的数据不匹配。我正在发送列并期待字符串?
    猜你喜欢
    • 2022-01-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-04-09
    • 2021-08-26
    • 2017-01-19
    • 1970-01-01
    • 2017-09-01
    相关资源
    最近更新 更多