【问题标题】:Unable to convert an RDD[Row] to a DataFrame无法将 RDD[Row] 转换为 DataFrame
【发布时间】:2017-01-26 08:19:31
【问题描述】:

对于以下代码 - 其中 DataFrame 被转换为 RDD[Row] 并通过 mapPartitions 附加新列的数据:

 // df is a DataFrame
val dfRdd = df.rdd.mapPartitions {
  val bfMap = df.rdd.sparkContext.broadcast(factorsMap)
  iter =>
    val locMap = bfMap.value
    iter.map { r =>
      val newseq = r.toSeq :+ locMap(r.getAs[String](inColName))
      Row(newseq)
    }
}

RDD[Row] 与另一列的输出是正确的:

println("**dfrdd\n" + dfRdd.take(5).mkString("\n"))

**dfrdd
[ArrayBuffer(0021BEC286CC, 4, Series, series, bc514da3e0d534da8207e3aab231d1cb, livetv, 148818)]
[ArrayBuffer(0021BEE7C556, 4, Series, series, bc514da3e0d534da8207e3aab231d1cb, livetv, 26908)]
[ArrayBuffer(8C7F3BFD4B82, 4, Series, series, bc514da3e0d534da8207e3aab231d1cb, livetv, 99942)]
[ArrayBuffer(0021BEC8F8B8, 1, Series, series, 0d2debc63efa3790a444c7959249712b, livetv, 53994)]
[ArrayBuffer(10EA59F10C8B, 1, Series, series, 0d2debc63efa3790a444c7959249712b, livetv, 1427)]

让我们尝试将RDD[Row] 转换回DataFrame:

val newSchema = df.schema.add(StructField("userf",IntegerType))

现在让我们创建更新后的 DataFrame:

val df2 = df.sqlContext.createDataFrame(dfRdd,newSchema)

新架构看起来正确吗?

newSchema.show()

root
 |-- user: string (nullable = true)
 |-- score: long (nullable = true)
 |-- programType: string (nullable = true)
 |-- source: string (nullable = true)
 |-- item: string (nullable = true)
 |-- playType: string (nullable = true)
 |-- userf: integer (nullable = true)

请注意,我们确实看到了新的 userf 列..

但它不起作用:

println("df2: " + df2.take(1))

Job aborted due to stage failure: Task 0 in stage 9.0 failed 1 times, 
most recent failure: Lost task 0.0 in stage 9.0 (TID 9, localhost, executor driver): java.lang.RuntimeException: Error while encoding: 

java.lang.RuntimeException: scala.collection.mutable.ArrayBuffer is not a  
 valid external type for schema of string
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, user), StringType), true) AS user#28
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, user), StringType), true)
   :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
   :  :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
   :  :  +- input[0, org.apache.spark.sql.Row, true]
   :  +- 0
   :- null

那么:这里缺少什么细节?

注意:我不同的方法感兴趣:例如withColumnDatasets.. 让我们只考虑方法:

  • 转换为 RDD
  • 向每一行添加新的数据元素
  • 更新新列的架构
  • 将新的 RDD+schema 转换回 DataFrame

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:

    调用Row的构造函数好像有个小错误:

    val newseq = r.toSeq :+ locMap(r.getAs[String](inColName))
    Row(newseq)
    

    这个“构造函数”(实际上是apply方法)的签名是:

    def apply(values: Any*): Row
    

    当您传递Seq[Any] 时,它被视为Seq[Any] 类型的单个值。你想传递这个序列的元素,因此你应该使用:

    val newseq = r.toSeq :+ locMap(r.getAs[String](inColName))
    Row(newseq: _*)
    

    修复此问题后,行将与您构建的架构相匹配,您将获得预期的结果。

    【讨论】:

    • 你说得对!现在你可以在代表中领先我了!
    • 非常感谢!我对 scala 和 spark 很陌生,这让我花了很长时间才发现。 Here 也是关于 : _* 符号的更多信息
    • 我刚刚遇到了这个问答 - 看到我投了赞成票 .. 然后 .. 是提问者!抱歉,我不能再次投票了
    猜你喜欢
    • 2017-06-13
    • 1970-01-01
    • 2019-09-09
    • 2016-12-25
    • 1970-01-01
    • 2016-08-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多