【问题标题】:Spark Streaming: join back to original stream after UpdateStateByKeySpark Streaming:在 UpdateStateByKey 之后加入原始流
【发布时间】:2016-03-09 13:02:01
【问题描述】:

我正在 Spark Streaming 中编写一个应用程序,其中我需要计算双精度值的指数移动平均值并将该平均值添加到行中。 该平均值的计算方式如下:

EMA(t) = EMA(t-1)*0.75 + 值(t)*0.25

每个时间间隔,每个名称都会输入一行数据:

(name1-24/04/2015 15:31; 观察(name1; 24/04/2015 15:31; 132.45))

(name2-24/04/2015 15:31; 观察(name2; 24/04/2015 15:31; 20.5))

我的唯一键存在一个名称和一个粘贴在一起的时间戳。然后我将名称和时间戳分开,然后是我的双精度值。我将跟踪每个不同名称的指数移动平均线。

我正在使用 updateStateByKey() 执行此操作,效果很好: (名称将是此操作中的关键,因为我需要每个名称的平均值)

case class Observation(name: String, time: Timestamp, outcome: Double)

val outcomeDstream: DStream[(String, Double)] = 
    parsedstream.map { case (k: String, obs: Observation) => (obs.name, obs.close) }

def updateEMA(newValues: Seq[Double],oldCount: Option[Double]): Option[Double] = {
  if (oldCount.isEmpty) newValues(0)
  else Some((newValues(0)*0.25) + (oldCount.get*(0.75)))
}

val ema = outcomeDstream.updateStateByKey[Double](updateEMA _)

我遇到的问题是:如果我使用这个函数来跟踪我的指数移动平均线,它将返回我:(name, expMovAvg)。但是我会丢失我的唯一密钥和时间戳。这样做的问题是我无法将这个 ema-Dstream 与我的原始流加入,因为我的密钥现在只是不唯一的名称。

是否有可能在我的 updateStateByKey 转换过程中保留唯一键或时间戳?

【问题讨论】:

    标签: scala apache-spark spark-streaming


    【解决方案1】:

    如果我正确理解您的问题,您可以使用 Option[Observation] 作为状态,名称为键,而不是在 updateStateByKey 中保留 Option[Double] 作为状态,它将包含您需要的所有唯一数据:

    val outcomeDstream: DStream[(String, Observation)] = 
        parsedstream.map { case (k: String, obs: Observation) => (obs.name, obs) }
    
    def updateEMA(newValues: Seq[Observation], 
                  oldCount: Option[Observation]): Option[Observation] = {
      if (oldCount.isEmpty) newValues(0)
      else Some((newValues(0).outcome * 0.25) + (oldCount.get.outcome * (0.75)))
    }
    

    附带说明,如果您使用的是 Spark 1.6.0,请考虑查看 PairDStreamFunctions.mapWithState。尽管语义略有不同(它不会处理尚未收到新值的密钥)并且仍处于试验阶段,it is superior in performance

    【讨论】:

    • 我自己都没想过这个!!!感谢您的帮助!这很完美!
    猜你喜欢
    • 1970-01-01
    • 2015-02-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-10-27
    • 2017-03-06
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多