【问题标题】:Generate a Spark StructType / Schema from a case class从案例类生成 Spark StructType / Schema
【发布时间】:2016-08-13 06:21:28
【问题描述】:

如果我想从case class 创建一个StructType(即DataFrame.schema),有没有办法在不创建DataFrame 的情况下做到这一点?我可以轻松做到:

case class TestCase(id: Long)
val schema = Seq[TestCase]().toDF.schema

但是当我想要的只是架构时,实际创建 DataFrame 似乎有点过头了。

(如果你好奇,问题背后的原因是我定义了一个UserDefinedAggregateFunction,为此你重写了几个返回StructTypes的方法,我使用了案例类。)

【问题讨论】:

    标签: apache-spark apache-spark-sql


    【解决方案1】:

    您可以按照SQLContext.createDataFrame 的方式进行操作:

    import org.apache.spark.sql.catalyst.ScalaReflection
    val schema = ScalaReflection.schemaFor[TestCase].dataType.asInstanceOf[StructType]
    

    【讨论】:

    • 谢谢——还没有完全进入o.a.s.sql.catalyst。如果我一直在思考,我会像你一样从createDataFrame 开始。 :-(
    • 甜,你甚至可以做到...schemaFor[(Long,Int,Long)]...
    • 不用担心 - 我只是很容易找到它,因为我前一段时间自己尝试过类似的东西;)是的 - 对任何 Product 都有效,谢谢 Scala!
    • 我有点喜欢toDF 的版本,只是为了简洁
    • 你知道如何用这种方法做一个 T 类型的数组吗?我尝试将 T 包装在另一个案例类中,但它没有按预期工作
    【解决方案2】:

    我知道这个问题已经有将近一年的历史了,但我遇到了它,并认为其他人也可能想知道我刚刚学会使用这种方法:

    import org.apache.spark.sql.Encoders
    val mySchema = Encoders.product[MyCaseClass].schema
    

    【讨论】:

    • 注意 - Encoders 对象被标记为 @Experimental 注释:“一个实验性的面向用户的 API。实验性 API 可能会在 Spark 的次要版本中更改或被删除,或者被用作一流的 Spark API。”发现这是为了找出不同方法的优缺点(当前答案与接受的答案)。
    【解决方案3】:

    如果有人想为自定义 Java bean 执行此操作:

    ExpressionEncoder.javaBean(Event.class).schema().json()
    

    【讨论】:

    • 还有Encoders.bean(Event.class).schema(),我认为它也是如此。
    • 当我使用它来设置架构时,我遇到的问题是上面的函数返回按字母顺序排列的数据成员,而我在文件中的数据列不是。由于它尝试按顺序而不是按名称进行匹配,这会导致数据损坏。关于如何解决这个问题的任何想法?
    【解决方案4】:

    与其手动复制用于创建传递给toDF 的隐式Encoder 对象的逻辑,不如直接使用它(或者更准确地说,以与toDF 相同的方式隐式使用):

    // spark: SparkSession
    
    import spark.implicits._
    
    implicitly[Encoder[MyCaseClass]].schema
    

    不幸的是,这实际上与在其他答案中使用 org.apache.spark.sql.catalystEncoders 存在相同的问题:the Encoder trait 是实验性的。

    这是如何工作的? Seq 上的 toDF 方法来自 DatasetHolder,它是通过 spark.implicits._ 导入的隐式 localSeqToDatasetHolder 创建的。该函数的定义如下:

    implicit def localSeqToDatasetHolder[T](s: Seq[T])(implicit arg0: Encoder[T]): DatasetHolder[T]
    

    如您所见,它需要一个implicitEncoder[T] 参数,对于case class,可以通过newProductEncoder 计算(也可以通过spark.implicits._ 导入)。我们可以通过方便的scala.Predef.implicitly(默认在范围内,因为它来自Predef)重现这个隐式逻辑来为我们的案例类获取Encoder,它将只返回其请求的隐式参数:

    def implicitly[T](implicit e: T): T
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2021-01-28
      • 1970-01-01
      • 2020-02-26
      • 1970-01-01
      • 2020-12-13
      • 2014-03-20
      • 1970-01-01
      相关资源
      最近更新 更多