【问题标题】:RuntimeException when converting Dataset<Row> to JavaRDD<Row> and then Dataframe将 Dataset<Row> 转换为 JavaRDD<Row> 然后 Dataframe 时出现 RuntimeException
【发布时间】:2018-12-18 02:45:38
【问题描述】:

我正在尝试使用以下代码将索引列添加到数据集,将其转换为 JavaPairRDD。

// ds is a Dataset<Row>
JavaPairRDD<Row, Long> indexedRDD = ds.toJavaRDD()
    .zipWithIndex();

// Now I am converting JavaPairRDD to JavaRDD as below.
JavaRDD<Row> rowRDD = indexedRDD
    .map(tuple -> RowFactory.create(tuple._1(),tuple._2().intValue()));

// I am converting the RDD back to dataframe and it doesnt work.
Dataset<Row> authDf = session
    .createDataFrame(rowRDD, ds.schema().add("ID", DataTypes.IntegerType));

// Below is the ds schema(Before adding the ID column).
ds.schema()

root
 |-- user: short (nullable = true)
 |-- score: long (nullable = true)
 |-- programType: string (nullable = true)
 |-- source: string (nullable = true)
 |-- item: string (nullable = true)
 |-- playType: string (nullable = true)
 |-- userf: integer (nullable = true)

以上代码抛出如下错误信息:

**Job aborted due to stage failure: Task 0 in stage 21.0 failed 4 
times, most  recent failure: Lost task 0.3 in stage 21.0 (TID 658, 
sl73caehdn0406.visa.com, executor 1):

java.lang.RuntimeException: 
Error while encoding: java.lang.RuntimeException: 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema is not 
a valid external type for schema of smallint**

【问题讨论】:

  • 由于 spark 的懒惰性质并且仅在有动作时才采取行动,因此尚不清楚哪个步骤正在引发异常。作为一个简单的调试步骤,您可以在每个步骤之后使用.show() 打印您的结构,这通常只会打印前 20 行。这将迫使 spark 按顺序计算每个步骤,当错误发生时而不是在工作流之后遇到错误。

标签: java apache-spark rdd


【解决方案1】:

您在第二条语句中创建的元组由两列组成:一列是对象(由初始数据集中的所有列组成),第二列是整数。 第二个元组列进入第二个结果列,它是长类型的。 第一个元组列进入第一个结果列,它的类型是短的 - 作为一个对象,即 GenericRowWithSchema,这会导致错误。

您应该使用 7 个参数执行 RowFactory.create(),每个结果列一个。

【讨论】:

    猜你喜欢
    • 2016-07-24
    • 1970-01-01
    • 2021-04-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-11-22
    • 2022-11-07
    • 2020-10-21
    相关资源
    最近更新 更多