【问题标题】:Convert nested json string to columns in DataFrame将嵌套的 json 字符串转换为 DataFrame 中的列
【发布时间】:2018-03-21 02:29:30
【问题描述】:

我在 DataFrame 中有一个列,其中包含字符串格式的嵌套 json

val df=Seq(("""{"-1":{"-1":[ 7420,0,20,22,0,0]}}""" ), ("""{"-1":{"-1":[1006,2,18,10,0,0]}}"""), ("""{"-1":{"-1":[6414,0,17,11,0,0]}}""")).toDF("column1")


+-------------------------------------+
|                              column1|           
+-------------------------------------+
|{"-1":{"-1":[7420, 0, 20, 22, 0, 0]}}|
|{"-1":{"-1":[1006, 2, 18, 10, 0, 0]}}|
|{"-1":{"-1":[6414, 0, 17, 11, 0, 0]}}|
+-----------------------+-------------+

I want to get a data frame that looks like this

+----+----+----+----+----+----+----+----+
|col1|col2|col3|col4|col5|col6|col7|col8|
+----+----+----+----+----+----+----+----+
|  -1|  -1|7420|   0|  20|  22|   0|   0|
|  -1|  -1|1006|   2|  18|  10|   0|   0|
|  -1|  -1|6414|   0|  17|  11|   0|   0|
+----+----+----+----+----+----+----+----+

我首先申请了get_json_object,它给了我

val df1= df.select(get_json_object($"column1", "$.-1")

+------------------------------+
|                       column1|           
+------------------------------+
|{"-1":[7420, 0, 20, 22, 0, 0]}|
|{"-1":[1006, 2, 18, 10, 0, 0]}|
|{"-1":[6414, 0, 17, 11, 0, 0]}|
+-----------------------+------+

所以我丢失了第一个元素。

我尝试将现有元素转换为我想要的格式

val schema = new StructType()                              
.add("-1",                                         
MapType(                                             
  StringType,
  new StructType()
  .add("a1", StringType)
  .add("a2", StringType)
  .add("a3", StringType)
  .add("a4", StringType)
  .add("a5", StringType)
  .add("a6", StringType)
  .add("a7", StringType)
  .add("a8", StringType)
  .add("a9", StringType)
  .add("a10", StringType)
  .add("a11", StringType)
  .add("a11", StringType)))

df1.select(from_json($"new2", schema ))

但它返回了一个包含所有空值的 1 列 DataFrame

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:

    您可以简单地使用from_json 内置函数将json字符串转换为实际的json对象schema定义为StructType(Seq(StructField("-1", StructType(Seq(StructField("-1", ArrayType(IntegerType)))))))

    import org.apache.spark.sql.functions._
    val jsonedDF = df.select(from_json(col("column1"), StructType(Seq(StructField("-1", StructType(Seq(StructField("-1", ArrayType(IntegerType)))))))).as("json"))
    jsonedDF.show(false)
    //    +---------------------------------------+
    //    |json                                   |
    //    +---------------------------------------+
    //    |[[WrappedArray(7420, 0, 20, 22, 0, 0)]]|
    //    |[[WrappedArray(1006, 2, 18, 10, 0, 0)]]|
    //    |[[WrappedArray(6414, 0, 17, 11, 0, 0)]]|
    //    +---------------------------------------+
    jsonedDF.printSchema()
    //    root
    //    |-- json: struct (nullable = true)
    //    |    |-- -1: struct (nullable = true)
    //    |    |    |-- -1: array (nullable = true)
    //    |    |    |    |-- element: integer (containsNull = true)
    

    之后,只需选择适当的列并使用别名为列指定适当的名称

    jsonedDF.select(
      lit("-1").as("col1"),
      lit("-1").as("col2"),
      col("json.-1.-1")(0).as("col3"),
      col("json.-1.-1")(1).as("col4"),
      col("json.-1.-1")(2).as("col5"),
      col("json.-1.-1")(3).as("col6"),
      col("json.-1.-1")(4).as("col7"),
      col("json.-1.-1")(5).as("col8")
    ).show(false)
    

    这应该会给你最后的dataframe

    +----+----+----+----+----+----+----+----+
    |col1|col2|col3|col4|col5|col6|col7|col8|
    +----+----+----+----+----+----+----+----+
    |-1  |-1  |7420|0   |20  |22  |0   |0   |
    |-1  |-1  |1006|2   |18  |10  |0   |0   |
    |-1  |-1  |6414|0   |17  |11  |0   |0   |
    +----+----+----+----+----+----+----+----+
    

    我使用 -1 作为文字,因为 它们是 json 字符串中的键名,并且总是相同的。

    【讨论】:

      【解决方案2】:

      您提供的 JSON 数据似乎无效

      您可以更改为字符串的 rdd 并将所有 "[]{}: 替换为空,将 : 替换为 , 以便创建一个逗号分隔的字符串并将其转换回数据框,如下所示

        //data as you provided 
        val df = Seq(
          ("""{"-1":{"-1":[ 7420,0,20,22,0,0]}}"""),
          ("""{"-1":{"-1":[1006,2,18,10,0,0]}}"""),
          ("""{"-1":{"-1":[6414,0,17,11,0,0]}}""")
        ).toDF("column1")
      
        //create a schema 
        val schema = new StructType()
          .add("col1", StringType)
          .add("col2", StringType)
          .add("col3", StringType)
          .add("col4", StringType)
          .add("col5", StringType)
          .add("col6", StringType)
          .add("col7", StringType)
          .add("col8", StringType)
          /*.add("a9", StringType)
          .add("a10", StringType)
          .add("a11", StringType)
          .add("a11", StringType)*/
      
        //convert to rdd and replace using regex 
        val df2 = df.rdd.map(_.getString(0))
          .map(_.replaceAll("[\"|\\[|\\]|{|}]", "").replace(":", ","))
          .map(_.split(","))
          .map(x => (x(0), x(1), x(2), x(3), x(4), x(5), x(6), x(7)))
          .toDF(schema.fieldNames :_*)
      

      val rdd = df.rdd.map(_.getString(0))
          .map(_.replaceAll("[\"|\\[|\\]|{|}]", "").replace(":", ","))
          .map(_.split(","))
          .map(x => Row(x(0), x(1), x(2), x(3), x(4), x(5), x(6), x(7)))
      
        val finalDF = spark.sqlContext.createDataFrame(rdd, schema)
      
        df2.show()
        //or 
        finalDF.show()
        //will have a same output
      

      输出:

      +----+----+-----+----+----+----+----+----+
      |col1|col2|col3 |col4|col5|col6|col7|col8|
      +----+----+-----+----+----+----+----+----+
      |-1  |-1  | 7420|0   |20  |22  |0   |0   |
      |-1  |-1  |1006 |2   |18  |10  |0   |0   |
      |-1  |-1  |6414 |0   |17  |11  |0   |0   |
      +----+----+-----+----+----+----+----+----+
      

      希望这会有所帮助!

      【讨论】:

        猜你喜欢
        • 2018-09-25
        • 2020-09-02
        • 1970-01-01
        • 2020-02-16
        • 2016-02-08
        • 2014-06-27
        • 1970-01-01
        • 1970-01-01
        • 2019-06-09
        相关资源
        最近更新 更多