【发布时间】:2018-12-18 02:45:38
【问题描述】:
我正在尝试使用以下代码将索引列添加到数据集,将其转换为 JavaPairRDD。
// ds is a Dataset<Row>
JavaPairRDD<Row, Long> indexedRDD = ds.toJavaRDD()
.zipWithIndex();
// Now I am converting JavaPairRDD to JavaRDD as below.
JavaRDD<Row> rowRDD = indexedRDD
.map(tuple -> RowFactory.create(tuple._1(),tuple._2().intValue()));
// I am converting the RDD back to dataframe and it doesnt work.
Dataset<Row> authDf = session
.createDataFrame(rowRDD, ds.schema().add("ID", DataTypes.IntegerType));
// Below is the ds schema(Before adding the ID column).
ds.schema()
root
|-- user: short (nullable = true)
|-- score: long (nullable = true)
|-- programType: string (nullable = true)
|-- source: string (nullable = true)
|-- item: string (nullable = true)
|-- playType: string (nullable = true)
|-- userf: integer (nullable = true)
以上代码抛出如下错误信息:
**Job aborted due to stage failure: Task 0 in stage 21.0 failed 4
times, most recent failure: Lost task 0.3 in stage 21.0 (TID 658,
sl73caehdn0406.visa.com, executor 1):
java.lang.RuntimeException:
Error while encoding: java.lang.RuntimeException:
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema is not
a valid external type for schema of smallint**
【问题讨论】:
-
由于 spark 的懒惰性质并且仅在有动作时才采取行动,因此尚不清楚哪个步骤正在引发异常。作为一个简单的调试步骤,您可以在每个步骤之后使用
.show()打印您的结构,这通常只会打印前 20 行。这将迫使 spark 按顺序计算每个步骤,当错误发生时而不是在工作流之后遇到错误。
标签: java apache-spark rdd