【问题标题】:How to store custom objects in Dataset?如何在数据集中存储自定义对象?
【发布时间】:2016-08-07 11:37:41
【问题描述】:

根据Introducing Spark Datasets

在期待 Spark 2.0 的同时,我们计划对数据集进行一些激动人心的改进,具体而言: ... 自定义编码器——虽然我们目前为多种类型自动生成编码器,但我们希望为自定义对象开放一个 API。

并尝试将自定义类型存储在 Dataset 中会导致以下错误:

找不到存储在数据集中的类型的编码器。通过导入 sqlContext.implicits 支持原始类型(Int、String 等)和产品类型(案例类)。_ 将在以后的版本中添加对序列化其他类型的支持

或:

Java.lang.UnsupportedOperationException: No Encoder found for ....

是否有任何现有的解决方法?


请注意,此问题仅作为社区 Wiki 答案的入口点存在。随时更新/改进问题和答案。

【问题讨论】:

    标签: scala apache-spark apache-spark-dataset apache-spark-encoders


    【解决方案1】:

    更新

    这个答案仍然有效且信息丰富,尽管自 2.2/2.3 以来情况有所好转,它添加了对 SetSeqMapDateTimestampBigDecimal。如果您坚持只使用案例类和通常的 Scala 类型来制作类型,那么您应该可以只使用 SQLImplicits 中的隐式。


    不幸的是,几乎没有添加任何内容来帮助解决此问题。在Encoders.scalaSQLImplicits.scala 中搜索@since 2.0.0 发现主要与原始类型有关(以及对案例类的一些调整)。所以,首先要说的是:目前对自定义类编码器没有真正好的支持。有了这些,接下来是一些技巧,考虑到我们目前可以使用的东西,这些技巧可以做得尽可能好。作为一个预先的免责声明:这不会完美运行,我会尽我所能明确和预先说明所有限制。

    究竟是什么问题

    当你想要创建一个数据集时,Spark“需要一个编码器(用于将 T 类型的 JVM 对象转换为内部 Spark SQL 表示和从内部 Spark SQL 表示转换),该编码器通常通过来自 SparkSession 的隐式自动创建,或者可以是通过在Encoders 上调用静态方法显式创建”(取自docs on createDataset)。编码器将采用Encoder[T] 的形式,其中T 是您正在编码的类型。第一个建议是添加import spark.implicits._(它为您提供these 隐式编码器),第二个建议是使用this 一组编码器相关函数显式传入隐式编码器。

    常规类没有可用的编码器,所以

    import spark.implicits._
    class MyObj(val i: Int)
    // ...
    val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
    

    会给你以下隐式相关的编译时错误:

    找不到存储在数据集中的类型的编码器。通过导入 sqlContext.implicits 支持原始类型(Int、String 等)和产品类型(案例类)。_ 将在未来版本中添加对序列化其他类型的支持

    但是,如果你在扩展 Product 的某个类中包装你刚刚用于获取上述错误的任何类型,错误会令人困惑地延迟到运行时,所以

    import spark.implicits._
    case class Wrap[T](unwrap: T)
    class MyObj(val i: Int)
    // ...
    val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))
    

    编译得很好,但在运行时失败

    java.lang.UnsupportedOperationException:未找到 MyObj 的编码器

    这样做的原因是 Spark 使用隐式创建的编码器实际上仅在运行时生成(通过 scala relfection)。在这种情况下,Spark 在编译时的所有检查是最外面的类扩展了Product(所有案例类都这样做),并且只在运行时意识到它仍然不知道如何处理MyObj(同样的问题如果我试图创建一个Dataset[(Int,MyObj)],就会发生这种情况——Spark 会等到运行时才会在MyObj 上吐槽)。这些是急需解决的核心问题:

    • 一些扩展 Product 的类尽管在运行时总是崩溃并且可以编译
    • 无法为嵌套类型传入自定义编码器(我无法为 Spark 提供仅用于 MyObj 的编码器,以便它知道如何编码 Wrap[MyObj](Int,MyObj))。

    只需使用kryo

    大家建议的解决方案是使用kryo编码器。

    import spark.implicits._
    class MyObj(val i: Int)
    implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
    // ...
    val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
    

    不过,这很快就会变得非常乏味。特别是如果您的代码正在操作各种数据集、连接、分组等。您最终会积累一堆额外的隐式。那么,为什么不直接做一个隐式来自动完成这一切呢?

    import scala.reflect.ClassTag
    implicit def kryoEncoder[A](implicit ct: ClassTag[A]) = 
      org.apache.spark.sql.Encoders.kryo[A](ct)
    

    现在,我似乎可以做任何我想做的事情(下面的示例在 spark-shell 中不起作用,其中 spark.implicits._ 是自动导入的)

    class MyObj(val i: Int)
    
    val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
    val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and ..
    val d3 = d1.map(d => (d.i,  d)).alias("d3") // .. deals with the new type
    val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!
    

    或者差不多。问题是使用kryo 会导致 Spark 仅将数据集中的每一行存储为平面二进制对象。对于mapfilterforeach 来说已经足够了,但是对于像join 这样的操作,Spark 确实需要将它们分成列。检查d2d3 的架构,您会看到只有一个二进制列:

    d2.printSchema
    // root
    //  |-- value: binary (nullable = true)
    

    元组的部分解决方案

    因此,使用 Scala 中隐含的魔力(更多内容请参见 6.26.3 Overloading Resolution),我可以为自己制作一系列隐含,这些隐含将尽可能好地完成工作,至少对于元组而言,并且可以很好地与现有隐含一起工作:

    import org.apache.spark.sql.{Encoder,Encoders}
    import scala.reflect.ClassTag
    import spark.implicits._  // we can still take advantage of all the old implicits
    
    implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)
    
    implicit def tuple2[A1, A2](
      implicit e1: Encoder[A1],
               e2: Encoder[A2]
    ): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)
    
    implicit def tuple3[A1, A2, A3](
      implicit e1: Encoder[A1],
               e2: Encoder[A2],
               e3: Encoder[A3]
    ): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)
    
    // ... you can keep making these
    

    然后,有了这些隐式,我可以让我上面的例子工作,尽管有一些列重命名

    class MyObj(val i: Int)
    
    val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
    val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2")
    val d3 = d1.map(d => (d.i  ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3")
    val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")
    

    我还没有弄清楚如何在不重命名的情况下默认获取预期的元组名称(_1_2、...) - 如果其他人想玩这个,this 是名称"value" 被引入,this 是通常添加元组名称的位置。然而,关键是我现在有了一个很好的结构化模式:

    d4.printSchema
    // root
    //  |-- _1: struct (nullable = false)
    //  |    |-- _1: integer (nullable = true)
    //  |    |-- _2: binary (nullable = true)
    //  |-- _2: struct (nullable = false)
    //  |    |-- _1: integer (nullable = true)
    //  |    |-- _2: binary (nullable = true)
    

    所以,总而言之,这个解决方法:

    • 允许我们为元组获取单独的列(这样我们就可以再次加入元组,耶!)
    • 我们可以再次依赖隐式(因此无需到处传递kryo
    • 几乎完全向后兼容import spark.implicits._(涉及一些重命名)
    • 让我们加入kyro 序列化的二进制列,更不用说那些可能有的字段了
    • 具有将一些元组列重命名为“值”的令人不快的副作用(如有必要,可以通过转换 .toDF、指定新列名称并转换回数据集和模式名称来撤消此操作似乎是通过最需要它们的连接来保留的)。

    一般类的部分解决方案

    这个不太愉快,没有好的解决方案。但是,既然我们有了上面的元组解决方案,我预感来自另一个答案的隐式转换解决方案也不会那么痛苦,因为您可以将更复杂的类转换为元组。然后,在创建数据集之后,您可能会使用数据框方法重命名列。如果一切顺利,这真的是一个改进,因为我现在可以在我的类的字段上执行连接。如果我只使用一个平面二进制 kryo 序列化程序,那是不可能的。

    这里有一个例子,它可以做所有事情:我有一个类MyObj,它的字段类型为Intjava.util.UUIDSet[String]。第一个照顾自己。第二个,虽然我可以使用kryo 进行序列化,但如果存储为String 会更有用(因为UUIDs 通常是我想要加入的东西)。第三个真的只是属于一个二进制列。

    class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])
    
    // alias for the type to convert to and from
    type MyObjEncoded = (Int, String, Set[String])
    
    // implicit conversions
    implicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)
    implicit def fromEncoded(e: MyObjEncoded): MyObj =
      new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)
    

    现在,我可以使用这个机器创建一个具有良好架构的数据集:

    val d = spark.createDataset(Seq[MyObjEncoded](
      new MyObj(1, java.util.UUID.randomUUID, Set("foo")),
      new MyObj(2, java.util.UUID.randomUUID, Set("bar"))
    )).toDF("i","u","s").as[MyObjEncoded]
    

    架构向我展示了我可以使用正确名称的列和前两个我可以加入的东西。

    d.printSchema
    // root
    //  |-- i: integer (nullable = false)
    //  |-- u: string (nullable = true)
    //  |-- s: binary (nullable = true)
    

    【讨论】:

    • @AlexeyS 我不这么认为。但你为什么要这样?为什么你不能摆脱我提出的最后一个解决方案?如果您可以将数据放在 JSON 中,您应该能够提取字段并将它们放在案例类中......
    • 不幸的是,这个答案的底线是没有有效的解决方案。
    • @combinatorist 我的理解是,从性能的角度来看,数据集和数据帧(但不是 RDD,因为它们不需要编码器!)是等效的。不要低估数据集的类型安全性!仅仅因为 Spark 在内部使用了大量的反射、强制转换等,并不意味着您不应该关心所公开接口的类型安全。但它确实让我对创建自己的基于数据集的类型安全函数感觉更好,这些函数在后台使用数据帧。
    • 我有一个作为魅力的解决方案。它包括: 1. 在自定义类上定义 sparkSql UDT 2. 注册它 3. 将您的类包含在产品中(例如 Tuple1)
    • 您是否考虑过使用 UDTRegistration?
    【解决方案2】:
    1. 使用通用编码器。

      目前有两种通用编码器kryojavaSerialization,其中后一种被明确描述为:

      效率极低,只能作为最后的手段。

      假设以下课程

      class Bar(i: Int) {
        override def toString = s"bar $i"
        def bar = i
      }
      

      您可以通过添加隐式编码器来使用这些编码器:

      object BarEncoders {
        implicit def barEncoder: org.apache.spark.sql.Encoder[Bar] = 
        org.apache.spark.sql.Encoders.kryo[Bar]
      }
      

      可以如下组合使用:

      object Main {
        def main(args: Array[String]) {
          val sc = new SparkContext("local",  "test", new SparkConf())
          val sqlContext = new SQLContext(sc)
          import sqlContext.implicits._
          import BarEncoders._
      
          val ds = Seq(new Bar(1)).toDS
          ds.show
      
          sc.stop()
        }
      }
      

      它将对象存储为binary 列,因此当转换为DataFrame 时,您会得到以下架构:

      root
       |-- value: binary (nullable = true)
      

      也可以使用kryo 编码器对特定字段的元组进行编码:

      val longBarEncoder = Encoders.tuple(Encoders.scalaLong, Encoders.kryo[Bar])
      
      spark.createDataset(Seq((1L, new Bar(1))))(longBarEncoder)
      // org.apache.spark.sql.Dataset[(Long, Bar)] = [_1: bigint, _2: binary]
      

      请注意,我们在这里不依赖隐式编码器,而是显式传递编码器,因此这很可能不适用于 toDS 方法。

    2. 使用隐式转换:

      提供可编码表示和自定义类之间的隐式转换,例如:

      object BarConversions {
        implicit def toInt(bar: Bar): Int = bar.bar
        implicit def toBar(i: Int): Bar = new Bar(i)
      }
      
      object Main {
        def main(args: Array[String]) {
          val sc = new SparkContext("local",  "test", new SparkConf())
          val sqlContext = new SQLContext(sc)
          import sqlContext.implicits._
          import BarConversions._
      
          type EncodedBar = Int
      
          val bars: RDD[EncodedBar]  = sc.parallelize(Seq(new Bar(1)))
          val barsDS = bars.toDS
      
          barsDS.show
          barsDS.map(_.bar).show
      
          sc.stop()
        }
      }
      

    相关问题:

    【讨论】:

    • 解决方案 1 似乎不适用于类型化集合(至少 Set)我得到 Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for Set[Bar]
    • @VictorP。恐怕在这种情况下,您将需要特定类型的编码器(kryo[Set[Bar]]。同样,如果类包含字段Bar,则您需要整个对象的编码器。这些是非常粗略的方法。
    • @zero323 我面临同样的问题。你能举一个如何编码整个项目的代码示例吗?非常感谢!
    • @Rock 我不确定你所说的“整个项目”是什么意思
    • @zero323 根据您的评论,“如果类包含 Bar 字段,则您需要整个对象的编码器”。我的问题是如何编码这个“整个项目”?
    【解决方案3】:

    您可以使用 UDTRegistration,然后使用案例类、元组等...都可以与您的用户定义类型一起正常工作!

    假设您要使用自定义枚举:

    trait CustomEnum { def value:String }
    case object Foo extends CustomEnum  { val value = "F" }
    case object Bar extends CustomEnum  { val value = "B" }
    object CustomEnum {
      def fromString(str:String) = Seq(Foo, Bar).find(_.value == str).get
    }
    

    这样注册:

    // First define a UDT class for it:
    class CustomEnumUDT extends UserDefinedType[CustomEnum] {
      override def sqlType: DataType = org.apache.spark.sql.types.StringType
      override def serialize(obj: CustomEnum): Any = org.apache.spark.unsafe.types.UTF8String.fromString(obj.value)
      // Note that this will be a UTF8String type
      override def deserialize(datum: Any): CustomEnum = CustomEnum.fromString(datum.toString)
      override def userClass: Class[CustomEnum] = classOf[CustomEnum]
    }
    
    // Then Register the UDT Class!
    // NOTE: you have to put this file into the org.apache.spark package!
    UDTRegistration.register(classOf[CustomEnum].getName, classOf[CustomEnumUDT].getName)
    

    那就用它吧!

    case class UsingCustomEnum(id:Int, en:CustomEnum)
    
    val seq = Seq(
      UsingCustomEnum(1, Foo),
      UsingCustomEnum(2, Bar),
      UsingCustomEnum(3, Foo)
    ).toDS()
    seq.filter(_.en == Foo).show()
    println(seq.collect())
    

    假设您要使用多态记录:

    trait CustomPoly
    case class FooPoly(id:Int) extends CustomPoly
    case class BarPoly(value:String, secondValue:Long) extends CustomPoly
    

    ... 并像这样使用它:

    case class UsingPoly(id:Int, poly:CustomPoly)
    
    Seq(
      UsingPoly(1, new FooPoly(1)),
      UsingPoly(2, new BarPoly("Blah", 123)),
      UsingPoly(3, new FooPoly(1))
    ).toDS
    
    polySeq.filter(_.poly match {
      case FooPoly(value) => value == 1
      case _ => false
    }).show()
    

    您可以编写一个自定义 UDT,将所有内容编码为字节(我在这里使用 java 序列化,但检测 Spark 的 Kryo 上下文可能更好)。

    首先定义UDT类:

    class CustomPolyUDT extends UserDefinedType[CustomPoly] {
      val kryo = new Kryo()
    
      override def sqlType: DataType = org.apache.spark.sql.types.BinaryType
      override def serialize(obj: CustomPoly): Any = {
        val bos = new ByteArrayOutputStream()
        val oos = new ObjectOutputStream(bos)
        oos.writeObject(obj)
    
        bos.toByteArray
      }
      override def deserialize(datum: Any): CustomPoly = {
        val bis = new ByteArrayInputStream(datum.asInstanceOf[Array[Byte]])
        val ois = new ObjectInputStream(bis)
        val obj = ois.readObject()
        obj.asInstanceOf[CustomPoly]
      }
    
      override def userClass: Class[CustomPoly] = classOf[CustomPoly]
    }
    

    然后注册:

    // NOTE: The file you do this in has to be inside of the org.apache.spark package!
    UDTRegistration.register(classOf[CustomPoly].getName, classOf[CustomPolyUDT].getName)
    

    那你就可以用了!

    // As shown above:
    case class UsingPoly(id:Int, poly:CustomPoly)
    
    Seq(
      UsingPoly(1, new FooPoly(1)),
      UsingPoly(2, new BarPoly("Blah", 123)),
      UsingPoly(3, new FooPoly(1))
    ).toDS
    
    polySeq.filter(_.poly match {
      case FooPoly(value) => value == 1
      case _ => false
    }).show()
    

    【讨论】:

    • 我看不到你的 kryo 用在哪里(在 CustomPolyUDT 中)
    • 我正在尝试在我的项目中定义一个 UDT,但我收到此错误“无法从该位置访问符号 UserDefinedType”。有什么帮助吗?
    • 嗨@RijoJoseph。您需要在您的项目中创建一个包 org.apache.spark 并将您的 UDT 代码放入其中。
    • 我尝试了这种方法,将我的代码放在 org.apache.spark 的包中。并打电话登记。但仍然收到关于我的枚举没有编码器的错误...?
    【解决方案4】:

    编码器在Spark2.0 中的工作方式大致相同。而Kryo 仍然是推荐的serialization 选择。

    您可以使用 spark-shell 查看以下示例

    scala> import spark.implicits._
    import spark.implicits._
    
    scala> import org.apache.spark.sql.Encoders
    import org.apache.spark.sql.Encoders
    
    scala> case class NormalPerson(name: String, age: Int) {
     |   def aboutMe = s"I am ${name}. I am ${age} years old."
     | }
    defined class NormalPerson
    
    scala> case class ReversePerson(name: Int, age: String) {
     |   def aboutMe = s"I am ${name}. I am ${age} years old."
     | }
    defined class ReversePerson
    
    scala> val normalPersons = Seq(
     |   NormalPerson("Superman", 25),
     |   NormalPerson("Spiderman", 17),
     |   NormalPerson("Ironman", 29)
     | )
    normalPersons: Seq[NormalPerson] = List(NormalPerson(Superman,25), NormalPerson(Spiderman,17), NormalPerson(Ironman,29))
    
    scala> val ds1 = sc.parallelize(normalPersons).toDS
    ds1: org.apache.spark.sql.Dataset[NormalPerson] = [name: string, age: int]
    
    scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name))
    ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string]
    
    scala> ds1.show()
    +---------+---+
    |     name|age|
    +---------+---+
    | Superman| 25|
    |Spiderman| 17|
    |  Ironman| 29|
    +---------+---+
    
    scala> ds2.show()
    +----+---------+
    |name|      age|
    +----+---------+
    |  25| Superman|
    |  17|Spiderman|
    |  29|  Ironman|
    +----+---------+
    
    scala> ds1.foreach(p => println(p.aboutMe))
    I am Ironman. I am 29 years old.
    I am Superman. I am 25 years old.
    I am Spiderman. I am 17 years old.
    
    scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name))
    ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string]
    
    scala> ds2.foreach(p => println(p.aboutMe))
    I am 17. I am Spiderman years old.
    I am 25. I am Superman years old.
    I am 29. I am Ironman years old.
    

    到目前为止] 目前范围内没有 appropriate encoders,因此我们的人员没有被编码为 binary 值。但是一旦我们使用Kryo 序列化提供一些implicit 编码器,这种情况就会改变。

    // Provide Encoders
    
    scala> implicit val normalPersonKryoEncoder = Encoders.kryo[NormalPerson]
    normalPersonKryoEncoder: org.apache.spark.sql.Encoder[NormalPerson] = class[value[0]: binary]
    
    scala> implicit val reversePersonKryoEncoder = Encoders.kryo[ReversePerson]
    reversePersonKryoEncoder: org.apache.spark.sql.Encoder[ReversePerson] = class[value[0]: binary]
    
    // Ecoders will be used since they are now present in Scope
    
    scala> val ds3 = sc.parallelize(normalPersons).toDS
    ds3: org.apache.spark.sql.Dataset[NormalPerson] = [value: binary]
    
    scala> val ds4 = ds3.map(np => ReversePerson(np.age, np.name))
    ds4: org.apache.spark.sql.Dataset[ReversePerson] = [value: binary]
    
    // now all our persons show up as binary values
    scala> ds3.show()
    +--------------------+
    |               value|
    +--------------------+
    |[01 00 24 6C 69 6...|
    |[01 00 24 6C 69 6...|
    |[01 00 24 6C 69 6...|
    +--------------------+
    
    scala> ds4.show()
    +--------------------+
    |               value|
    +--------------------+
    |[01 00 24 6C 69 6...|
    |[01 00 24 6C 69 6...|
    |[01 00 24 6C 69 6...|
    +--------------------+
    
    // Our instances still work as expected    
    
    scala> ds3.foreach(p => println(p.aboutMe))
    I am Ironman. I am 29 years old.
    I am Spiderman. I am 17 years old.
    I am Superman. I am 25 years old.
    
    scala> ds4.foreach(p => println(p.aboutMe))
    I am 25. I am Superman years old.
    I am 29. I am Ironman years old.
    I am 17. I am Spiderman years old.
    

    【讨论】:

    • 当我们执行.show时,如何在使用编码器后转换回正常的非二进制值?
    【解决方案5】:

    在 Java Bean 类的情况下,这可能很有用

    import spark.sqlContext.implicits._
    import org.apache.spark.sql.Encoders
    implicit val encoder = Encoders.bean[MyClasss](classOf[MyClass])
    

    现在您可以简单地将 dataFrame 读取为自定义 DataFrame

    dataFrame.as[MyClass]
    

    这将创建一个自定义类编码器,而不是二进制编码器。

    【讨论】:

      【解决方案6】:

      我的示例将使用 Java,但我认为适应 Scala 并不困难。

      只要Fruit 是一个简单的Java Bean,我就可以使用spark.createDatasetEncoders.bean 成功地将RDD<Fruit> 转换为Dataset<Fruit>

      第 1 步:创建简单的 Java Bean。

      public class Fruit implements Serializable {
          private String name  = "default-fruit";
          private String color = "default-color";
      
          // AllArgsConstructor
          public Fruit(String name, String color) {
              this.name  = name;
              this.color = color;
          }
      
          // NoArgsConstructor
          public Fruit() {
              this("default-fruit", "default-color");
          }
      
          // ...create getters and setters for above fields
          // you figure it out
      }
      

      在 DataBricks 人加强他们的编码器之前,我会坚持使用原始类型和字符串作为字段的类。 如果您有一个包含嵌套对象的类,请创建另一个简单的 Java Bean,并将其所有字段都展平,这样您就可以使用 RDD 转换将复杂类型映射到更简单的类型。 当然,这有点额外的工作,但我想这对使用平面模式的性能有很大帮助。

      第 2 步:从 RDD 获取数据集

      SparkSession spark = SparkSession.builder().getOrCreate();
      JavaSparkContext jsc = new JavaSparkContext();
      
      List<Fruit> fruitList = ImmutableList.of(
          new Fruit("apple", "red"),
          new Fruit("orange", "orange"),
          new Fruit("grape", "purple"));
      JavaRDD<Fruit> fruitJavaRDD = jsc.parallelize(fruitList);
      
      
      RDD<Fruit> fruitRDD = fruitJavaRDD.rdd();
      Encoder<Fruit> fruitBean = Encoders.bean(Fruit.class);
      Dataset<Fruit> fruitDataset = spark.createDataset(rdd, bean);
      

      瞧!起泡、冲洗、重复。

      【讨论】:

      • 我建议指出,对于简单的结构,最好将它们存储在原生 Spark 类型中,而不是将它们序列化为 blob。它们在 Python 网关中工作得更好,在 Parquet 中更透明,甚至可以转换为相同形状的结构。
      【解决方案7】:

      对于可能遇到我这种情况的人,我也将我的答案放在这里。

      具体来说,

      1. 我正在从 SQLContext 中读取“设置类型数据”。所以原始数据格式是DataFrame。

        val sample = spark.sqlContext.sql("select 1 as a, collect_set(1) as b limit 1") sample.show()

        +---+---+ | a| b| +---+---+ | 1|[1]| +---+---+

      2. 然后使用带有 mutable.WrappedArray 类型的 rdd.map() 将其转换为 RDD。

        sample .rdd.map(r => (r.getInt(0), r.getAs[mutable.WrappedArray[Int]](1).toSet)) .collect() .foreach(println)

        结果:

        (1,Set(1))

      【讨论】:

        【解决方案8】:

        除了已经给出的建议之外,我最近发现的另一个选择是您可以声明您的自定义类,包括特征org.apache.spark.sql.catalyst.DefinedByConstructorParams

        如果类的构造函数使用 ExpressionEncoder 可以理解的类型,即原始值和标准集合,则此方法有效。当您无法将类声明为案例类,但又不想在每次将其包含在数据集中时都使用 Kryo 对其进行编码时,它会派上用场。

        例如,我想声明一个包含 Breeze 向量的案例类。唯一能够处理这种情况的编码器通常是 Kryo。但是,如果我声明了一个扩展 Breeze DenseVector 和 DefinedByConstructorParams 的子类,ExpressionEncoder 就会明白它可以序列化为一个 Doubles 数组。

        我是这样声明的:

        class SerializableDenseVector(values: Array[Double]) extends breeze.linalg.DenseVector[Double](values) with DefinedByConstructorParams
        implicit def BreezeVectorToSerializable(bv: breeze.linalg.DenseVector[Double]): SerializableDenseVector = bv.asInstanceOf[SerializableDenseVector]
        

        现在我可以在数据集中使用SerializableDenseVector(直接或作为产品的一部分),使用简单的 ExpressionEncoder 而无需 Kryo。它的工作方式与 Breeze DenseVector 类似,但序列化为 Array[Double]。

        【讨论】:

          【解决方案9】:

          @Alec 的回答很棒!只是在他/她的答案的这一部分添加评论:

          import spark.implicits._
          case class Wrap[T](unwrap: T)
          class MyObj(val i: Int)
          // ...
          val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))
          

          @Alec 提到:

          无法为嵌套类型传递自定义编码器(我无法为 Spark 提供仅用于 MyObj 的编码器,以便它知道如何编码 Wrap[MyObj] 或 (Int,MyObj))。

          似乎是这样,因为如果我为MyObj添加编码器:

          implicit val myEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
          

          ,还是失败了:

          java.lang.UnsupportedOperationException: No Encoder found for MyObj
          - field (class: "MyObj", name: "unwrap")
          - root class: "Wrap"
            at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:643)
          

          但请注意重要的错误消息:

          根类:“Wrap”

          它实际上暗示了编码MyObj 是不够的,您必须对包括Wrap[T] 在内的整个链 进行编码。

          所以如果我这样做,它解决了问题

          implicit val myWrapperEncoder = org.apache.spark.sql.Encoders.kryo[Wrap[MyObj]]
          

          因此,@Alec 的评论不是那么正确:

          我无法为 Spark 提供仅用于 MyObj 的编码器,以便它知道如何编码 Wrap[MyObj] 或 (Int,MyObj)

          我们仍然有办法为 Spark 提供 MyObj 的编码器,这样它就知道如何编码 Wrap[MyObj] 或 (Int,MyObj)。

          【讨论】:

            猜你喜欢
            • 2021-01-19
            • 2011-08-26
            • 2019-03-18
            • 2011-01-19
            • 2020-08-14
            相关资源
            最近更新 更多