【问题标题】:Convert Spark's DataFrame to RDD[Vector]将 Spark 的 DataFrame 转换为 RDD[Vector]
【发布时间】:2017-06-02 10:14:03
【问题描述】:

当我尝试使用以下代码将 Spark 的 DataFrame 转换为 RDD[org.apache.spark.mllib.linalg.Vector] 时:

import org.apache.spark.sql.Row
import org.apache.spark.mllib.linalg.Vectors

val df = sqlContext.createDataFrame(
  Seq((0.1, 0.2, 0.4))
).toDF("t1", "t2", "t3")

df.rdd.map{ case Row(row: Seq[_]) =>
  Vectors.dense(row.asInstanceOf[Seq[Double]].toArray)
}.collect

我收到这样的错误消息:

scala.MatchError: [0.1,0.2,0.4] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)

然后我尝试了另一种方法:

df.content.rdd.map{ case row =>
  Vectors.dense(row.toSeq.toArray.map{
    x => x.asInstanceOf[Double]
  })
}.collect

结果很好。

虽然在official version of Spark-2.2.0-SNAPSHOT 中引入了第一种方法,在将row 转换为Array[Double] 时,它不起作用。

谁能找出原因?

【问题讨论】:

    标签: scala apache-spark apache-spark-sql apache-spark-mllib


    【解决方案1】:

    这两种方法的作用不同。在第一种情况下,您尝试使用单个 ArrayType 列与 Row 匹配。由于您的输入包含三列,MatchException 是预期结果。这仅在您将列收集为数组时才有效,例如

    df.select(array(df.columns.map(col(_)): _*)).rdd.map { 
      case Row(xs: Seq[Double @unchecked]) => xs 
    }
    

    df.select(array(df.columns.map(col(_)): _*)).rdd.map(_.getSeq[Double](0))
    

    在第二种情况下,您将 row 转换为 Seq[Any],这会为您提供一系列字段值。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2018-03-23
      • 2018-03-05
      • 1970-01-01
      • 1970-01-01
      • 2017-06-13
      • 2018-10-21
      • 1970-01-01
      相关资源
      最近更新 更多