【问题标题】:Scala Spark - Split JSON column to multiple columnsScala Spark - 将 JSON 列拆分为多列
【发布时间】:2020-04-24 01:00:35
【问题描述】:

Scala 菜鸟,使用 Spark 2.3.0
我正在使用创建 JSON 字符串列的 udf 创建 DataFrame:

val result: DataFrame = df.withColumn("decrypted_json", instance.decryptJsonUdf(df("encrypted_data")))

输出如下:

+----------------+---------------------------------------+
| encrypted_data | decrypted_json                        |
+----------------+---------------------------------------+
|eyJleHAiOjE1 ...| {"a":547.65 , "b":"Some Data"}        |
+----------------+---------------------------------------+

UDF 是一个外部代码,我无法更改。我想将 decrypted_json 列拆分为单独的列,以便输出 DataFrame 如下所示:

+----------------+----------------------+
| encrypted_data | a      | b           |
+----------------+--------+-------------+
|eyJleHAiOjE1 ...| 547.65 | "Some Data" |
+----------------+--------+-------------+

【问题讨论】:

标签: json scala apache-spark user-defined-functions


【解决方案1】:

使用from_jason,您可以将 JSON 解析为 Struct 类型,然后从该数据框中选择列。您将需要知道 json 的架构。方法如下 -

    val sparkSession = //create spark session
    import sparkSession.implicits._

    val jsonData = """{"a":547.65 , "b":"Some Data"}"""
    val schema = {StructType(
      List(
        StructField("a", DoubleType, nullable = false),
        StructField("b", StringType, nullable = false)
      ))}

    val df = sparkSession.createDataset(Seq(("dummy data",jsonData))).toDF("string_column","json_column")
    val dfWithParsedJson = df.withColumn("json_data",from_json($"json_column",schema))

    dfWithParsedJson.select($"string_column",$"json_column",$"json_data.a", $"json_data.b").show()

结果

+-------------+------------------------------+------+---------+
|string_column|json_column                   |a     |b        |
+-------------+------------------------------+------+---------+
|dummy data   |{"a":547.65 , "b":"Some Data"}|547.65|Some Data|
+-------------+------------------------------+------+---------+

【讨论】:

  • 感谢您的回复,我究竟应该传递什么架构?
  • 需要传递json的Schema。我认为我的代码有一个额外的引号。有时间我会修的。
【解决方案2】:

以下解决方案的灵感来自@Jacek Laskowski 给出的解决方案之一:

import org.apache.spark.sql.types._
val JsonSchema = new StructType()
  .add($"a".string)
  .add($"b".string)
val schema = new StructType()
  .add($"encrypted_data".string)
  .add($"decrypted_json".array(JsonSchema))

val schemaAsJson = schema.json

import org.apache.spark.sql.types.DataType
val dt = DataType.fromJson(schemaAsJson)

import org.apache.spark.sql.functions._

val rawJsons = Seq("""
  {
    "encrypted_data" : "eyJleHAiOjE1",
    "decrypted_json" : [
      {
        "a" : "547.65",
        "b" : "Some Data"
      }
    ]
  }
""").toDF("rawjson")

val people = rawJsons
  .select(from_json($"rawjson", schemaAsJson, Map.empty[String, String]) as "json")
  .select("json.*") // <-- flatten the struct field
  .withColumn("address", explode($"decrypted_json")) // <-- explode the array field
  .drop("decrypted_json")  // <-- no longer needed
  .select("encrypted_data", "address.*") // <-- flatten the struct field

请通过Link获取原始解决方案并附上说明。
我希望这会有所帮助。

【讨论】:

  • 如何让header变成“encrypted_data, json_a, json_b”,即在json的字段中添加“json_”前缀?
猜你喜欢
  • 1970-01-01
  • 2017-01-08
  • 1970-01-01
  • 2019-03-11
  • 2018-09-29
  • 2021-09-03
  • 2020-12-06
  • 2017-01-07
相关资源
最近更新 更多