【问题标题】:Spark : Average of values instead of sum in reduceByKey using ScalaSpark:使用Scala的reduceByKey中的平均值而不是总和
【发布时间】:2018-02-15 15:22:10
【问题描述】:

当调用 reduceByKey 时,它将所有具有相同键的值相加。有什么方法可以计算每个键的平均值吗?

// I calculate the sum like this and don't know how to calculate the avg
reduceByKey((x,y)=>(x+y)).collect


Array(((Type1,1),4.0), ((Type1,1),9.2), ((Type1,2),8), ((Type1,2),4.5), ((Type1,3),3.5), 
((Type1,3),5.0), ((Type2,1),4.6), ((Type2,1),4), ((Type2,1),10), ((Type2,1),4.3))

【问题讨论】:

标签: scala apache-spark


【解决方案1】:

一种方法是使用比aggregateByKey 更容易的mapValues 和reduceByKey。

.mapValues(value => (value, 1)) // map entry with a count of 1
.reduceByKey {
  case ((sumL, countL), (sumR, countR)) => 
    (sumL + sumR, countL + countR)
}
.mapValues { 
  case (sum , count) => sum / count 
}
.collect

https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html

【讨论】:

    【解决方案2】:

    有很多方法......但一个简单的方法是使用一个类来跟踪您的总数和计数并在最后计算平均值。像这样的东西会起作用。

    class AvgCollector(val tot: Double, val cnt: Int = 1) {
      def combine(that: AvgCollector) = new AvgCollector(tot + that.tot, cnt + that.cnt)
      def avg = tot / cnt 
    }
    
    val rdd2 = {
      rdd
      .map{ case (k,v) => (k, new AvgCollector(v)) }
      .reduceByKey(_ combine _)
      .map{ case (k,v) => (k, v.avg) }
    }
    

    ...或者您可以使用 aggregateByKey 调整类

    class AvgCollector(val tot: Double, val cnt: Int = 1) {
      def ++(v: Double) = new AvgCollector(tot + v, cnt + 1)
      def combine(that: AvgCollector) = new AvgCollector(tot + that.tot, cnt + that.cnt)
      def avg = if (cnt > 0) tot / cnt else 0.0
    }
    
    rdd2 = {
      rdd
      .aggregateByKey( new AvgCollector(0.0,0) )(_ ++ _, _ combine _ )
      .map{ case (k,v) => (k, v.avg) }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2023-03-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-02-18
      相关资源
      最近更新 更多