【问题标题】:Rename key in a nested Spark DataFrame Schema (Scala)重命名嵌套 Spark DataFrame Schema (Scala) 中的键
【发布时间】:2019-06-13 01:02:26
【问题描述】:

我有一个用例需要读取嵌套的 JSON 架构并将其写回为 Parquet我的架构会根据我读取数据的那一天发生变化,所以我不知道提前了解确切的模式),因为在我的一些嵌套键中,当我想将其保存为镶木地板时,我有一些类似空格的字符,我收到一个异常,抱怨特殊字符 ,;{}()\\n\\t=

这是一个示例架构,它不是真正的架构键是动态的并且每天都在变化

  val nestedSchema = StructType(Seq(
    StructField("event_time", StringType),
    StructField("event_id", StringType),
    StructField("app", StructType(Seq(
      StructField("environment", StringType),
      StructField("name", StringType),
      StructField("type", StructType(Seq(
        StructField("word tier", StringType), ### This cause problem when you save it as Parquet
        StructField("level", StringType)
    ))
 ))))))

val nestedDF = spark.createDataFrame(sc.emptyRDD[Row], nestedSchema)

myDF.printSchema

输出

root
 |-- event_time: string (nullable = true)
 |-- event_id: string (nullable = true)
 |-- app: struct (nullable = true)
 |    |-- environment: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- type: struct (nullable = true)
 |    |    |-- word tier: string (nullable = true)
 |    |    |-- level: string (nullable = true)

尝试另存为镶木地板

myDF.write
          .mode("overwrite")
          .option("compression", "snappy")
          .parquet("PATH/TO/DESTINATION")

我找到了这样的解决方案

myDF.toDF(myDF
          .schema
          .fieldNames
          .map(name => "[ ,;{}()\\n\\t=]+".r.replaceAllIn(name, "_")): _*)
.write
              .mode("overwrite")
              .option("compression", "snappy")
              .parquet("PATH/TO/DESTINATION")

但它仅适用于父键,不适用于嵌套键。是否有任何递归解决方案?

我的问题不是 this question 的重复,因为我的架构是动态的,我不知道我的密钥是什么。它会根据我正在读取的数据而变化,所以我的解决方案应该是通用的,我需要以某种方式递归地创建相同的架构结构,但键名是正确的。

【问题讨论】:

    标签: scala apache-spark schema parquet


    【解决方案1】:

    基本上,您必须构造一个Column 表达式,它将您的输入转换为具有经过清理的字段名称的类型。为此,您可以使用org.apache.spark.sql.functions.struct 函数,该函数允许您组合其他Columns 来构造结构类型的列。像这样的东西应该可以工作:

      import org.apache.spark.sql.{functions => f}
    
      def sanitizeName(s: String): String = s.replace(" ", "_")
    
      def sanitizeFieldNames(st: StructType, context: String => Column): Column = f.struct(
        st.fields.map { sf =>
          val sanitizedName = sanitizeName(sf.name)
          val sanitizedField = sf.dataType match {
            case struct: StructType =>
              val subcontext = context(sf.name)
              sanitizeFieldNames(struct, subcontext(_))
            case _ => context(sf.name)
          }
          sanitizedField.as(sanitizedName)
        }: _*
      )
    

    你可以这样使用它:

    val df: DataFrame = ...
    
    val appFieldType = df.schema("app").asInstanceOf[StructType]  // or otherwise obtain the field type
    df.withColumn(
      "app",
      sanitizeFieldNames(appFieldType, df("app")(_))
    )
    

    对于您的类型,此递归函数将返回类似的列

    f.struct(
      df("app")("environment").as("environment"),
      df("app")("name").as("name"),
      f.struct(
        df("app")("type")("word tier").as("word_tier"),
        df("app")("type")("level").as("level")
      ).as("type")
    )
    

    然后将其分配给“app”字段,替换那里存在的内容。

    不过,此解决方案存在局限性。它不支持嵌套数组或映射:如果您的架构在数组或映射内部具有结构,则此方法不会转换数组和映射内部的任何结构。话虽如此,在 Spark 2.4 中,它们添加了对集合执行操作的函数,因此在 Spark 2.4 中,该函数可能也可以泛化为支持嵌套数组和映射。

    最后,你可以用mapPartitions 做你想做的事。首先,您编写一个递归方法,它只清理您的字段的StructType

    def sanitizeType(dt: DataType): DataType = dt match {
      case st: StructType => ...  // rename fields and invoke recursively
      case at: ArrayType => ...  // invoke recursively
      case mt: MapType => ...  // invoke recursively
      case _ => dt  // simple types do not have anything to sanitize
    }
    

    其次,您将经过净化的架构应用于您的数据框。基本上有两种方法可以做到这一点:一种是安全的mapPartitions,另一种是依赖于内部 Spark API。

    mapPartitions,很简单:

    df.mapPartitions(identity)(RowEncoder(sanitizeType(df.schema)))
    

    在这里,我们应用mapPartitions 操作并明确指定输出编码器。请记住,Spark 中的模式不是数据固有的:它们总是与特定的数据帧相关联。数据框中的所有数据都表示为在单个字段上没有标签的行,只有位置。只要您的架构在相同位置具有完全相同的类型(但名称可能不同),它就应该按您的预期工作。

    mapPartitions 确实会在逻辑计划中产生几个额外的节点。为了避免这种情况,可以直接用特定的编码器构造一个Dataset[Row] 实例:

    new Dataset[Row](df.sparkSession, df.queryExecution.logical, RowEncoder(sanitizeType(df.schema)))
    

    这将避免不必要的mapPartitions(这通常会导致查询执行计划中的反序列化映射序列化步骤),但它可能不安全;我个人现在没有检查它,但它可以为你工作。

    【讨论】:

    • 感谢您的回答,但我有点迷茫,在我们应用 sanitizeName 像这样 val appFieldType = df.schema("app").asInstanceOf[StructType] 的第一步时,我需要保留整个架构的其余字段会发生什么.以及 sanitizeName 定义中的 f 是什么。
    • 它是否保留旧模式?添加到替换架构中的应用会发生什么情况?
    • ffunctions 对象,我的错 - 忘记添加导入。在使用f.struct 的方法中,我使用Dataset.withColumn 方法,它使用提供的定义添加或替换指定的列(在这种情况下,它替换了app 列)。其他列不受影响。
    猜你喜欢
    • 2019-08-17
    • 2016-06-06
    • 2016-12-29
    • 1970-01-01
    • 2017-06-21
    • 1970-01-01
    • 1970-01-01
    • 2020-10-02
    • 2019-07-07
    相关资源
    最近更新 更多