【问题标题】:Add ADT column in Spark dataset?在 Spark 数据集中添加 ADT 列?
【发布时间】:2020-04-10 21:23:15
【问题描述】:

我想创建一个包含 ADT 列的数据集。基于这个问题:Encode an ADT / sealed trait hierarchy into Spark DataSet column 我知道,有一个用 kryo 编码的解决方案,但这并没有真正的帮助。 还有另一种更好的方法来解决这个问题。让我们定义以下 ADT:

sealed case class Animal(sound: String)
object Cat extends Animal("miau")
object Dog extends Animal("wuff") 

并定义一个使用Animal的案例类

case class Pet(name: String, sound: Animal)

我现在可以轻松地从 Pet 中创建数据集

val ds = List(Pet("Tom", Cat), Pet("Beethoven", Dog)).toDS
ds.show()
+---------+------+
|     name| sound|
+---------+------+
|      Tom|[miau]|
|Beethoven|[wuff]|
+---------+------+

注意,soundStruct,但提取元素很简单:

ds.select("name", "sound.*").show()
+---------+-----+
|name     |sound|
+---------+-----+
|Tom      |miau |
|Beethoven|wuff |
+---------+-----+

实际上这是我想要实现的最终结构。 我面临两个问题。

  1. 通常从案例类继承不是一个好主意
  2. 详尽的模式匹配要求默认情况

问题 2 示例:

 def getSound(animal: Animal): String = animal match {
   case Cat => Cat.sound
   case Dog => Dog.sound
   case _ => ""
 }

为了克服问题 2,我想创建一个密封的抽象类。我也想把它做成产品

sealed abstract class Animal(sound: String) extends Product
case object Cat extends Animal("miau")
case object Dog extends Animal("wuff")

现在问题 2 已处理,不再需要默认情况。但是我无法从 Animal 创建数据集。我得到以下异常: java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: Couldn't find sound on class Animal

我真正想要获得的是获得与Option 相同的行为。 我们可以创建一个包含可选字段的案例类:

case class Person(name: String, age: Option[Int])
List(Person("Jack", Some(26)), Person("Julia", None)).toDS.show()
+-----+----+
| name| age|
+-----+----+
| Jack|  26|
|Julia|null|
+-----+----+

我检查了 Option 的实现,它也是一个密封的抽象类,所以我错过了什么? 数据集的 Option 是如何编码的?

更新

抱歉,最后一部分 Option 在这里没有太大意义,因为您需要在数据集中明确写出您希望在最后看到的值。

但问题仍然存在,我如何使用正确的模式匹配对从 ADT 创建的列进行编码。

【问题讨论】:

    标签: scala apache-spark apache-spark-dataset algebraic-data-types apache-spark-encoders


    【解决方案1】:

    我缺少的是我的 Animal 类的 apply 方法。

    sealed abstract class Animal(val sound: String) extends Product with Serializable
      case object Cat extends Animal(sound = "miau")
      case object Dog extends Animal(sound = "wuff")
      object Animal {
        def apply(animal: Animal): String = animal match {
          case Cat => Cat.sound
          case Dog => Dog.sound
        }
      }
    

    使用这个我可以获得几乎想要的结果:

    val ds = List(Pet("Tom", Cat), Pet("Beethoven", Dog)).toDS
    ds.show()
    +---------+------+
    |     name| sound|
    +---------+------+
    |      Tom|[miau]|
    |Beethoven|[wuff]|
    +---------+------+
    

    【讨论】:

    • 这永远不会在集群上工作,因为抽象类。它会抛出一个 RTE org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 44, Column 11: Cannot instantiate abstract "Animal"
    • 有趣的是,我在 spark shell(版本 2.4.0-cdh6.3.1,客户端模式)中进行了尝试,并生成了具有所需架构的数据集。我也会尝试在集群模式下运行,使用 jar
    【解决方案2】:

    sanyi14ka 实际上永远不会工作。

    DataSet 中的 ADT/Enum Encoders 的话题并不新鲜,但即使在今天它也不能正常工作。

    您可能会发现这两个链接很有用:

    【讨论】:

      猜你喜欢
      • 2017-09-05
      • 2018-05-08
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多