【发布时间】:2019-08-15 20:49:47
【问题描述】:
我有一个 JSON 字符串,我将其加载到 Spark DataFrame 中。 JSON 字符串可以有 0 到 3 个键值对。
当发送多个 kv 对时,product_facets 被正确格式化为如下数组:
{"id":1,
"productData":{
"product":{
"product_name":"xyz",
"product_facets":{"entry":[{"key":"test","value":"success"}, {"key": "test2","value" : "fail"}]}
}}}
我现在可以使用爆炸功能了:
sourceDF.filter($"someKey".contains("some_string"))
.select($"id", explode($"productData.product.product_facets.entry") as "kvPairs")
但是,当仅发送一个键值时,用于条目的源 JSON 字符串未格式化为带有方括号的数组:
{"id":1,
"productData":{
"product":{
"product_name":"xyz",
"product_facets":{"entry":{"key":"test","value":"success"}}
}}}
产品标签的架构如下:
| |-- product: struct (nullable = true)
| | |-- product_facets: struct (nullable = true)
| | | |-- entry: string (nullable = true)
| | |-- product_name: string (nullable = true)
如何将条目更改为与explode 函数兼容的键值对数组。我的最终目标是将键旋转到各个列中,并且我想使用 group by 来分解 kv 对。我尝试使用from_json,但无法正常工作。
val schema =
StructType(
Seq(
StructField("entry", ArrayType(
StructType(
Seq(
StructField("key", StringType),
StructField("value",StringType)
)
)
))
)
)
sourceDF.filter($"someKey".contains("some_string"))
.select($"id", from_json($"productData.product.product_facets.entry", schema) as "kvPairsFromJson")
但上面确实创建了一个看起来像“[]”的新列 kvPairsFromJson,并且使用 explode 没有任何作用。
关于发生了什么或是否有更好的方法可以做到这一点的任何指针?
【问题讨论】:
-
你有两种数据:一种它的'product_facets'是一个数组,另一种它的'product_facets'是一个字符串。我对吗?您正在尝试加载两者并将它们作为单一字段(product_facets-wise)来处理。是这样吗?
-
@nir-hedvat 是的,这是正确的。一个是数组,在其他情况下它是一个字符串,我想将它们都视为一个数组,以便能够使用explode函数。
-
这使用简单的 SQL 查询是不可行的,因为 Spark 无法处理具有多个模式(包括读取和写入)的数据。您应该使用 UDF 来实现这一点。看看这个docs.databricks.com/spark/latest/spark-sql/udf-scala.html。只需传递保存数据的字段并始终为其返回一个数组。
标签: scala apache-spark apache-spark-sql