【问题标题】:Spark: Summary statisticsSpark:汇总统计
【发布时间】:2019-02-21 15:24:08
【问题描述】:

我正在尝试使用 Spark 汇总统计信息,如下所述:https://spark.apache.org/docs/1.1.0/mllib-statistics.html

根据 Spark 文档:

import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
import org.apache.spark.mllib.linalg.DenseVector

val observations: RDD[Vector] = ... // an RDD of Vectors

// Compute column summary statistics.
val summary: MultivariateStatisticalSummary =     Statistics.colStats(observations)

我在构建 observations:RDD[Vector] 对象时遇到问题。我试试:

scala> val data:Array[Double] = Array(1, 2, 3, 4, 5)
data: Array[Double] = Array(1.0, 2.0, 3.0, 4.0, 5.0)

scala> val v = new DenseVector(data)
v: org.apache.spark.mllib.linalg.DenseVector = [1.0,2.0,3.0,4.0,5.0]

scala> val observations = sc.parallelize(Array(v))
observations:   org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.DenseVector] =   ParallelCollectionRDD[3] at parallelize at <console>:19

scala> val summary: MultivariateStatisticalSummary = Statistics.colStats(observations)
<console>:21: error: type mismatch;
 found   : org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.DenseVector]
 required: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector]
Note: org.apache.spark.mllib.linalg.DenseVector <: org.apache.spark.mllib.linalg.Vector, but class RDD is invariant in type T.
You may wish to define T as +T instead. (SLS 4.5)
val summary: MultivariateStatisticalSummary =  Statistics.colStats(observations)

问题:

1) 我应该如何将 DenseVector 转换为 Vector?

2) 在实际程序中,而不是双精度数组,我可以使用以下方法获取从 RDD 获得的集合的统计信息:

def countByKey(): Map[K, Long]
//Count the number of elements for each key, and return the result to the master as a Map.

所以我必须这样做:

 myRdd.countByKey().values.map(_.toDouble)

这没有多大意义,因为我现在不得不使用常规的 Scala 集合,而不是使用 RDD,这些集合有时会停止适应内存。 Spark 分布式计算的所有优势都丧失了。

如何以可扩展的方式解决这个问题?

更新

在我的情况下,我有:

val cnts: org.apache.spark.rdd.RDD[Int] = prodCntByCity.map(_._2) // get product counts only 
val doubleCnts: org.apache.spark.rdd.RDD[Double] = cnts.map(_.toDouble)

如何将doubleCnts转换成observations: RDD[Vector]

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    1) 你不需要投射,你只需要输入:

    val observations = sc.parallelize(Array(v: Vector))
    

    2) 使用aggregateByKey(将所有键映射到1,并通过求和减少)而不是countByKey

    【讨论】:

    • 谢谢!现在还有一个问题:如何将org.apache.spark.rdd.RDD[Double] 转换为RDD[Vector]
    • 首先决定如何将Double 转换为Vector,然后再将map 转换为map
    • 在我的情况下,我只有一列作为 RDD 的双重计数,我需要获得基本的统计数据,例如平均值、标准差、分位数......如果我理解使用 MLib Statistics 的权利对于我的情况,我需要构建val observations: RDD[Vector] - RDD 中包含的单个向量。所以要实现这一点,我需要转换 RDD[Double] --> DenseVector[Double] --&gt; RDD[Vector]。这种转换迫使我离开 RDD 空间并进入内存 DenseVector[Double] 结构。这是对的吗?如果是这种情况,那么 RDD 的所有优势都将失去......
    • 如果您的RDD[Vector] 只包含一个Vector,那么作为RDD 当然没有任何优势。 colStats 旨在与许多小的Vectors 中的RDD 一起使用。如果您只想要单个数据系列的均值/方差等,您需要创建一个 RDD 的 1 元素 Vectors,而不是包含大 Vector 的 1 元素 RDD
    • colStats 给你向量的第一个元素的平均值,向量的第二个元素的平均值等等。如果您的向量太长(难以置信),您只需拨打两次电话colStats
    【解决方案2】:

    DenseVector 具有压缩功能。因此您可以将 RDD[DenseVector] 更改为 RDD[Vector] 为:

        val st =observations.map(x=>x.compressed)
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-05-18
      • 2011-07-10
      • 1970-01-01
      • 1970-01-01
      • 2019-03-19
      • 1970-01-01
      • 2017-02-25
      • 2023-03-24
      相关资源
      最近更新 更多