【问题标题】:How to solve Type mismatch issue (expected: Double, actual: Unit)如何解决类型不匹配问题(预期:Double,实际:Unit)
【发布时间】:2016-05-02 14:29:48
【问题描述】:

这是我计算均方根误差的函数。但是由于错误Type mismatch issue (expected: Double, actual: Unit),最后一行无法编译。我尝试了许多不同的方法来解决这个问题,但仍然没有成功。有什么想法吗?

  def calculateRMSE(output: DStream[(Double, Double)]): Double = {
        val summse = output.foreachRDD { rdd =>
          rdd.map {
              case pair: (Double, Double) =>
                val err = math.abs(pair._1 - pair._2);
                err*err
          }.reduce(_ + _)
        }
        // math.sqrt(summse)  HOW TO APPLY SQRT HERE?
  }

【问题讨论】:

  • @Yuval Itzchakov:我想计算流数据的均方根误差。也许我尝试完成这项任务的方式是不正确的。如果是这样,我想知道正确的方法,假设输入数据的类型为DStream[(Double,Double)]
  • 我仍然不确定你在做什么。
  • @eliasah:DStream 包含成对的 Double,例如((5.0, 5.2), (5.1, 5.15)...) 假设对中的第一个元素是实际值,而第二个元素是预测值。我需要做的是使用均方根误差 (RMSE) 度量来计算实际值和预测值之间的误差。当我在流中获得新数据时,RMSE 应该明显改变(即应该使用我的函数 calculateRMSE 重新计算)。这不可能吗?
  • 你想用这个 RMSE 做什么? foreachRDD 不返回任何值
  • 这和你之前的问题stackoverflow.com/questions/36978409/…基本一样。但正如@eliasah 所说,您正在遍历每个 RDD,然后将计算结果丢弃,因为 foreach 不会返回值。

标签: scala apache-spark rdd dstream


【解决方案1】:

正如 eliasah 指出的,foreach(和foreachRDD)不返回值;它们仅用于副作用。如果你想退货,你需要map。根据您的第二个解决方案:

val rmse = output.map(rdd => new RegressionMetrics(rdd).rootMeanSquaredError)

如果你为它做一个小功能,它看起来会更好:

val getRmse = (rdd: RDD) => new RegressionMetrics(rdd).rootMeanSquaredError

val rmse = output.map(getRmse)

忽略空 RDD,

val rmse = output.filter(_.nonEmpty).map(getRmse)

这与理解的顺序完全相同。它只是 map、flatMap 和 filter 的语法糖,但我认为当我第一次学习 Scala 时它更容易理解:

val rmse = for {
  rdd <- output
  if (rdd.nonEmpty)
} yield new RegressionMetrics(rdd).rootMeanSquaredError

这是一个汇总错误的函数,就像您的第一次尝试一样:

def calculateRmse(output: DStream[(Double, Double)]): Double = {

val getRmse = (rdd: RDD) => new RegressionMetrics(rdd).rootMeanSquaredError

output.filter(_.nonEmpty).map(getRmse).reduce(_+_)
}

编译器对nonEmpty的抱怨其实是DStream的filter方法的问题。 filter 不是对 DStream 中的 RDD 进行操作,而是对 DStream 的类型参数给定的双精度对 (Double, Double) 进行操作。

我对 Spark 的了解还不够,不能说它是一个缺陷,但它很奇怪。 Filter 和大多数其他对集合的操作通常是 defined in terms of foreach,但是 DStream 实现了这些功能而不遵循相同的约定;它已弃用的方法 foreach 和当前的 foreachRDD 都在流的 RDD 上运行,但 its other higher-order methods don't

所以我的方法行不通。 DStream 可能有一个奇怪的理由(与性能相关?)这可能是使用foreach 的不好方法:

def calculateRmse(ds: DStream[(Double, Double)]): Double = {

  var totalError: Double = 0

  def getRmse(rdd:RDD[(Double, Double)]): Double = new RegressionMetrics(rdd).rootMeanSquaredError

  ds.foreachRDD((rdd:RDD[(Double, Double)]) => if (!rdd.isEmpty) totalError += getRmse(rdd))

  totalError
}

但它有效!

【讨论】:

  • 谢谢。函数 calculateRMSE 无法编译 - 它显示 Cannot resolve symbol rdd, + and nonEmpty。你知道为什么会这样吗?
  • 如果是nonEmpty,它也会显示Type mismatch, expected ((Double,Double))=&gt;Boolean, actual ((Double,Double))=&gt;Any
  • 哎呀。我在getRmse 中犯了一个错误,括号应该只在函数的左半边。但是rdd+ 的问题稍微严重一些。我将编辑我的答案。
【解决方案2】:

我设法完成了以下任务:

import org.apache.spark.mllib.evaluation.RegressionMetrics

output.foreachRDD { rdd =>
  if (!rdd.isEmpty)
    {
      val metrics = new RegressionMetrics(rdd)
      val rmse = metrics.rootMeanSquaredError
      println("RMSE: " + rmse)
    }
}

【讨论】:

  • 是的,我希望如此:) foreachRDD 不返回任何值。我虽然应该在不同的 RDD 上累积 MSE,然后估计 RMSE。但方法不同,如答案所示。
  • 除了打印它之外,您仍然没有对 RMSE 做任何事情
猜你喜欢
  • 1970-01-01
  • 2021-02-07
  • 2021-09-15
  • 2017-04-07
  • 2019-12-07
  • 2016-11-21
  • 2019-05-18
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多