更新
这个答案仍然有效且信息丰富,尽管自 2.2/2.3 以来情况有所好转,它添加了对 Set、Seq、Map、Date、Timestamp 和BigDecimal。如果您坚持只使用案例类和通常的 Scala 类型来制作类型,那么您应该可以只使用 SQLImplicits 中的隐式。
不幸的是,几乎没有添加任何内容来帮助解决此问题。在Encoders.scala 或SQLImplicits.scala 中搜索@since 2.0.0 发现主要与原始类型有关(以及对案例类的一些调整)。所以,首先要说的是:目前对自定义类编码器没有真正好的支持。有了这些,接下来是一些技巧,考虑到我们目前可以使用的东西,这些技巧可以做得尽可能好。作为一个预先的免责声明:这不会完美运行,我会尽我所能明确和预先说明所有限制。
究竟是什么问题
当你想要创建一个数据集时,Spark“需要一个编码器(用于将 T 类型的 JVM 对象转换为内部 Spark SQL 表示和从内部 Spark SQL 表示转换),该编码器通常通过来自 SparkSession 的隐式自动创建,或者可以是通过在Encoders 上调用静态方法显式创建”(取自docs on createDataset)。编码器将采用Encoder[T] 的形式,其中T 是您正在编码的类型。第一个建议是添加import spark.implicits._(它为您提供these 隐式编码器),第二个建议是使用this 一组编码器相关函数显式传入隐式编码器。
常规类没有可用的编码器,所以
import spark.implicits._
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
会给你以下隐式相关的编译时错误:
找不到存储在数据集中的类型的编码器。通过导入 sqlContext.implicits 支持原始类型(Int、String 等)和产品类型(案例类)。_ 将在未来版本中添加对序列化其他类型的支持
但是,如果你在扩展 Product 的某个类中包装你刚刚用于获取上述错误的任何类型,错误会令人困惑地延迟到运行时,所以
import spark.implicits._
case class Wrap[T](unwrap: T)
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))
编译得很好,但在运行时失败
java.lang.UnsupportedOperationException:未找到 MyObj 的编码器
这样做的原因是 Spark 使用隐式创建的编码器实际上仅在运行时生成(通过 scala relfection)。在这种情况下,Spark 在编译时的所有检查是最外面的类扩展了Product(所有案例类都这样做),并且只在运行时意识到它仍然不知道如何处理MyObj(同样的问题如果我试图创建一个Dataset[(Int,MyObj)],就会发生这种情况——Spark 会等到运行时才会在MyObj 上吐槽)。这些是急需解决的核心问题:
- 一些扩展
Product 的类尽管在运行时总是崩溃并且可以编译
- 无法为嵌套类型传入自定义编码器(我无法为 Spark 提供仅用于
MyObj 的编码器,以便它知道如何编码 Wrap[MyObj] 或 (Int,MyObj))。
只需使用kryo
大家建议的解决方案是使用kryo编码器。
import spark.implicits._
class MyObj(val i: Int)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
不过,这很快就会变得非常乏味。特别是如果您的代码正在操作各种数据集、连接、分组等。您最终会积累一堆额外的隐式。那么,为什么不直接做一个隐式来自动完成这一切呢?
import scala.reflect.ClassTag
implicit def kryoEncoder[A](implicit ct: ClassTag[A]) =
org.apache.spark.sql.Encoders.kryo[A](ct)
现在,我似乎可以做任何我想做的事情(下面的示例在 spark-shell 中不起作用,其中 spark.implicits._ 是自动导入的)
class MyObj(val i: Int)
val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and ..
val d3 = d1.map(d => (d.i, d)).alias("d3") // .. deals with the new type
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!
或者差不多。问题是使用kryo 会导致 Spark 仅将数据集中的每一行存储为平面二进制对象。对于map、filter、foreach 来说已经足够了,但是对于像join 这样的操作,Spark 确实需要将它们分成列。检查d2 或d3 的架构,您会看到只有一个二进制列:
d2.printSchema
// root
// |-- value: binary (nullable = true)
元组的部分解决方案
因此,使用 Scala 中隐含的魔力(更多内容请参见 6.26.3 Overloading Resolution),我可以为自己制作一系列隐含,这些隐含将尽可能好地完成工作,至少对于元组而言,并且可以很好地与现有隐含一起工作:
import org.apache.spark.sql.{Encoder,Encoders}
import scala.reflect.ClassTag
import spark.implicits._ // we can still take advantage of all the old implicits
implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)
implicit def tuple2[A1, A2](
implicit e1: Encoder[A1],
e2: Encoder[A2]
): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)
implicit def tuple3[A1, A2, A3](
implicit e1: Encoder[A1],
e2: Encoder[A2],
e3: Encoder[A3]
): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)
// ... you can keep making these
然后,有了这些隐式,我可以让我上面的例子工作,尽管有一些列重命名
class MyObj(val i: Int)
val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2")
val d3 = d1.map(d => (d.i ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3")
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")
我还没有弄清楚如何在不重命名的情况下默认获取预期的元组名称(_1、_2、...) - 如果其他人想玩这个,this 是名称"value" 被引入,this 是通常添加元组名称的位置。然而,关键是我现在有了一个很好的结构化模式:
d4.printSchema
// root
// |-- _1: struct (nullable = false)
// | |-- _1: integer (nullable = true)
// | |-- _2: binary (nullable = true)
// |-- _2: struct (nullable = false)
// | |-- _1: integer (nullable = true)
// | |-- _2: binary (nullable = true)
所以,总而言之,这个解决方法:
- 允许我们为元组获取单独的列(这样我们就可以再次加入元组,耶!)
- 我们可以再次依赖隐式(因此无需到处传递
kryo)
- 几乎完全向后兼容
import spark.implicits._(涉及一些重命名)
-
不让我们加入
kyro 序列化的二进制列,更不用说那些可能有的字段了
- 具有将一些元组列重命名为“值”的令人不快的副作用(如有必要,可以通过转换
.toDF、指定新列名称并转换回数据集和模式名称来撤消此操作似乎是通过最需要它们的连接来保留的)。
一般类的部分解决方案
这个不太愉快,没有好的解决方案。但是,既然我们有了上面的元组解决方案,我预感来自另一个答案的隐式转换解决方案也不会那么痛苦,因为您可以将更复杂的类转换为元组。然后,在创建数据集之后,您可能会使用数据框方法重命名列。如果一切顺利,这真的是一个改进,因为我现在可以在我的类的字段上执行连接。如果我只使用一个平面二进制 kryo 序列化程序,那是不可能的。
这里有一个例子,它可以做所有事情:我有一个类MyObj,它的字段类型为Int、java.util.UUID 和Set[String]。第一个照顾自己。第二个,虽然我可以使用kryo 进行序列化,但如果存储为String 会更有用(因为UUIDs 通常是我想要加入的东西)。第三个真的只是属于一个二进制列。
class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])
// alias for the type to convert to and from
type MyObjEncoded = (Int, String, Set[String])
// implicit conversions
implicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)
implicit def fromEncoded(e: MyObjEncoded): MyObj =
new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)
现在,我可以使用这个机器创建一个具有良好架构的数据集:
val d = spark.createDataset(Seq[MyObjEncoded](
new MyObj(1, java.util.UUID.randomUUID, Set("foo")),
new MyObj(2, java.util.UUID.randomUUID, Set("bar"))
)).toDF("i","u","s").as[MyObjEncoded]
架构向我展示了我可以使用正确名称的列和前两个我可以加入的东西。
d.printSchema
// root
// |-- i: integer (nullable = false)
// |-- u: string (nullable = true)
// |-- s: binary (nullable = true)