【发布时间】:2016-10-08 22:52:30
【问题描述】:
Spark 数据集从 Row's 移到 Encoder's for Pojo's/primitives。 Catalyst 引擎使用ExpressionEncoder 来转换 SQL 表达式中的列。然而,似乎没有Encoder 的其他子类可用作我们自己实现的模板。
以下是 Spark 1.X / DataFrames 中的代码示例,但在新机制中无法编译:
//mapping each row to RDD tuple
df.map(row => {
var id: String = if (!has_id) "" else row.getAs[String]("id")
var label: String = row.getAs[String]("label")
val channels : Int = if (!has_channels) 0 else row.getAs[Int]("channels")
val height : Int = if (!has_height) 0 else row.getAs[Int]("height")
val width : Int = if (!has_width) 0 else row.getAs[Int]("width")
val data : Array[Byte] = row.getAs[Any]("data") match {
case str: String => str.getBytes
case arr: Array[Byte@unchecked] => arr
case _ => {
log.error("Unsupport value type")
null
}
}
(id, label, channels, height, width, data)
}).persist(StorageLevel.DISK_ONLY)
}
我们得到一个编译器错误
Error:(56, 11) Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are supported
by importing spark.implicits._ Support for serializing other types will be added in future releases.
df.map(row => {
^
那么在某种程度上/某处应该有办法
- 定义/实现我们的自定义编码器
- 在
DataFrame(现在是Row类型的数据集)上执行映射时应用它 - 注册编码器以供其他自定义代码使用
我正在寻找能够成功执行这些步骤的代码。
【问题讨论】:
标签: scala apache-spark apache-spark-dataset apache-spark-encoders