【发布时间】:2016-03-09 13:02:01
【问题描述】:
我正在 Spark Streaming 中编写一个应用程序,其中我需要计算双精度值的指数移动平均值并将该平均值添加到行中。 该平均值的计算方式如下:
EMA(t) = EMA(t-1)*0.75 + 值(t)*0.25
每个时间间隔,每个名称都会输入一行数据:
(name1-24/04/2015 15:31; 观察(name1; 24/04/2015 15:31; 132.45))
(name2-24/04/2015 15:31; 观察(name2; 24/04/2015 15:31; 20.5))
我的唯一键存在一个名称和一个粘贴在一起的时间戳。然后我将名称和时间戳分开,然后是我的双精度值。我将跟踪每个不同名称的指数移动平均线。
我正在使用 updateStateByKey() 执行此操作,效果很好: (名称将是此操作中的关键,因为我需要每个名称的平均值)
case class Observation(name: String, time: Timestamp, outcome: Double)
val outcomeDstream: DStream[(String, Double)] =
parsedstream.map { case (k: String, obs: Observation) => (obs.name, obs.close) }
def updateEMA(newValues: Seq[Double],oldCount: Option[Double]): Option[Double] = {
if (oldCount.isEmpty) newValues(0)
else Some((newValues(0)*0.25) + (oldCount.get*(0.75)))
}
val ema = outcomeDstream.updateStateByKey[Double](updateEMA _)
我遇到的问题是:如果我使用这个函数来跟踪我的指数移动平均线,它将返回我:(name, expMovAvg)。但是我会丢失我的唯一密钥和时间戳。这样做的问题是我无法将这个 ema-Dstream 与我的原始流加入,因为我的密钥现在只是不唯一的名称。
是否有可能在我的 updateStateByKey 转换过程中保留唯一键或时间戳?
【问题讨论】:
标签: scala apache-spark spark-streaming