【发布时间】:2019-06-13 01:02:26
【问题描述】:
我有一个用例需要读取嵌套的 JSON 架构并将其写回为 Parquet(我的架构会根据我读取数据的那一天发生变化,所以我不知道提前了解确切的模式),因为在我的一些嵌套键中,当我想将其保存为镶木地板时,我有一些类似空格的字符,我收到一个异常,抱怨特殊字符 ,;{}()\\n\\t=
这是一个示例架构,它不是真正的架构键是动态的并且每天都在变化
val nestedSchema = StructType(Seq(
StructField("event_time", StringType),
StructField("event_id", StringType),
StructField("app", StructType(Seq(
StructField("environment", StringType),
StructField("name", StringType),
StructField("type", StructType(Seq(
StructField("word tier", StringType), ### This cause problem when you save it as Parquet
StructField("level", StringType)
))
))))))
val nestedDF = spark.createDataFrame(sc.emptyRDD[Row], nestedSchema)
myDF.printSchema
输出
root
|-- event_time: string (nullable = true)
|-- event_id: string (nullable = true)
|-- app: struct (nullable = true)
| |-- environment: string (nullable = true)
| |-- name: string (nullable = true)
| |-- type: struct (nullable = true)
| | |-- word tier: string (nullable = true)
| | |-- level: string (nullable = true)
尝试另存为镶木地板
myDF.write
.mode("overwrite")
.option("compression", "snappy")
.parquet("PATH/TO/DESTINATION")
我找到了这样的解决方案
myDF.toDF(myDF
.schema
.fieldNames
.map(name => "[ ,;{}()\\n\\t=]+".r.replaceAllIn(name, "_")): _*)
.write
.mode("overwrite")
.option("compression", "snappy")
.parquet("PATH/TO/DESTINATION")
但它仅适用于父键,不适用于嵌套键。是否有任何递归解决方案?
我的问题不是 this question 的重复,因为我的架构是动态的,我不知道我的密钥是什么。它会根据我正在读取的数据而变化,所以我的解决方案应该是通用的,我需要以某种方式递归地创建相同的架构结构,但键名是正确的。
【问题讨论】:
标签: scala apache-spark schema parquet