【问题标题】:Convert a json string to array of key-value pairs in Spark scala在 Spark scala 中将 json 字符串转换为键值对数组
【发布时间】:2019-08-15 20:49:47
【问题描述】:

我有一个 JSON 字符串,我将其加载到 Spark DataFrame 中。 JSON 字符串可以有 0 到 3 个键值对。

当发送多个 kv 对时,product_facets 被正确格式化为如下数组:

{"id":1,
  "productData":{
  "product":{
  "product_name":"xyz",
  "product_facets":{"entry":[{"key":"test","value":"success"}, {"key": "test2","value" : "fail"}]}
 }}}

我现在可以使用爆炸功能了:

sourceDF.filter($"someKey".contains("some_string"))
  .select($"id", explode($"productData.product.product_facets.entry") as "kvPairs")

但是,当仅发送一个键值时,用于条目的源 JSON 字符串未格式化为带有方括号的数组:

{"id":1,
  "productData":{
  "product":{
  "product_name":"xyz",
  "product_facets":{"entry":{"key":"test","value":"success"}}
 }}}

产品标签的架构如下:

|    |-- product: struct (nullable = true)
|    |    |-- product_facets: struct (nullable = true)
|    |    |    |-- entry: string (nullable = true)
|    |    |-- product_name: string (nullable = true)

如何将条目更改为与explode 函数兼容的键值对数组。我的最终目标是将键旋转到各个列中,并且我想使用 group by 来分解 kv 对。我尝试使用from_json,但无法正常工作。

    val schema =
    StructType(
      Seq(
        StructField("entry", ArrayType(
          StructType(
            Seq(
              StructField("key", StringType),
              StructField("value",StringType)
            )
          )
        ))
      )
    )

sourceDF.filter($"someKey".contains("some_string"))
      .select($"id", from_json($"productData.product.product_facets.entry", schema) as "kvPairsFromJson")

但上面确实创建了一个看起来像“[]”的新列 kvPairsFromJson,并且使用 explode 没有任何作用。

关于发生了什么或是否有更好的方法可以做到这一点的任何指针?

【问题讨论】:

  • 你有两种数据:一种它的'product_facets'是一个数组,另一种它的'product_facets'是一个字符串。我对吗?您正在尝试加载两者并将它们作为单一字段(product_facets-wise)来处理。是这样吗?
  • @nir-hedvat 是的,这是正确的。一个是数组,在其他情况下它是一个字符串,我想将它们都视为一个数组,以便能够使用explode函数。
  • 这使用简单的 SQL 查询是不可行的,因为 Spark 无法处理具有多个模式(包括读取和写入)的数据。您应该使用 UDF 来实现这一点。看看这个docs.databricks.com/spark/latest/spark-sql/udf-scala.html。只需传递保存数据的字段并始终为其返回一个数组。

标签: scala apache-spark apache-spark-sql


【解决方案1】:

我认为一种方法可能是:
1. 创建一个以entry值为json字符串的udf,并将其转换为List( Tuple(K, V))
2.在udf中,检查entry的值是否为数组,并进行相应的转换。

下面的代码解释了上述方法:

// one row where entry is array and other non-array
val ds = Seq("""{"id":1,"productData":{"product":{"product_name":"xyz","product_facets":{"entry":[{"key":"test","value":"success"},{"key":"test2","value":"fail"}]}}}}""", """{"id":2,"productData":{"product":{"product_name":"xyz","product_facets":{"entry":{"key":"test","value":"success"}}}}}""").toDS

val df = spark.read.json(ds)

// Schema used by udf to generate output column    
import org.apache.spark.sql.types._
val outputSchema = ArrayType(StructType(Seq(
  StructField("key", StringType, false),
  StructField("value", StringType, false)
)))

// Converts non-array entry value to array
val toArray = udf((json: String) => {

  import com.fasterxml.jackson.databind._
  import com.fasterxml.jackson.module.scala.DefaultScalaModule

  val jsonMapper = new ObjectMapper()
  jsonMapper.registerModule(DefaultScalaModule)

  if(!json.startsWith("[")) {
    val jsonMap = jsonMapper.readValue(json, classOf[Map[String, String]])
    List((jsonMap("key"), jsonMap("value")))
  } else {
    jsonMapper.readValue(json, classOf[List[Map[String, String]]]).map(f => (f("key"), f("value")))
  } 

}, outputSchema)

val arrayResult = df.select(col("id").as("id"), toArray(col("productData.product.product_facets.entry")).as("entry"))

val arrayExploded = df.select(col("id").as("id"), explode(toArray(col("productData.product.product_facets.entry"))).as("entry"))

val explodedToCols = df.select(col("id").as("id"), explode(toArray(col("productData.product.product_facets.entry"))).as("entry")).select(col("id"), col("entry.key").as("key"), col("entry.value").as("value"))

结果:

scala> arrayResult.printSchema
root
 |-- id: long (nullable = true)
 |-- entry: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key: string (nullable = false)
 |    |    |-- value: string (nullable = false)


scala> arrayExploded.printSchema
root
 |-- id: long (nullable = true)
 |-- entry: struct (nullable = true)
 |    |-- key: string (nullable = false)
 |    |-- value: string (nullable = false)

scala> arrayResult.show(false)
+---+--------------------------------+
|id |entry                           |
+---+--------------------------------+
|1  |[[test, success], [test2, fail]]|
|2  |[[test, success]]               |
+---+--------------------------------+

scala> arrayExploded.show(false)
+---+---------------+
|id |entry          |
+---+---------------+
|1  |[test, success]|
|1  |[test2, fail]  |
|2  |[test, success]|
+---+---------------+

【讨论】:

  • 感谢您编写 sn-p。那真的很有帮助。在我接受这个作为答案之前。我认为我们需要处理“json”的输入字符串可以为空的情况。在某些情况下 product_facets 是一个空字符串,如何在 udf 中处理它?
  • 我想通了。在 product_facets 为空字符串的情况下,udf 需要稍作修改才能返回一个空数组。
猜你喜欢
  • 1970-01-01
  • 2017-10-10
  • 1970-01-01
  • 1970-01-01
  • 2019-09-20
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-04-09
相关资源
最近更新 更多