【问题标题】:spark how to convert a json string to a struct column without schemaspark如何将json字符串转换为没有架构的结构列
【发布时间】:2022-04-21 22:13:27
【问题描述】:

火花:3.0.0 斯卡拉:2.12.8

我的数据框有一列包含 JSON 字符串,我想使用 StructType 从中创建一个新列。


|temp_json_string                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
+
|{"name":"test","id":"12","category":[{"products":["A","B"],"displayName":"test_1","displayLabel":"test1"},{"products":["C"],"displayName":"test_2","displayLabel":"test2"}],"createdAt":"","createdBy":""}|
+
root
 |-- temp_json_string: string (nullable = true)

json 字符串看起来像

{
  "name":"test",
  "id":"12",
  "category":[
    {
      "products":[
        "A",
        "B"
      ],
      "displayName":"test_1",
      "displayLabel":"test1"
    },
    {
      "products":[
        "C"
      ],
      "displayName":"test_2",
      "displayLabel":"test2"
    }
  ],
  "createdAt":"",
  "createdBy":""
}

我想创建一个 Struct 类型的新列,所以我尝试了:

 dataFrame
      .withColumn("temp_json_struct", struct(col("temp_json_string")))
      .select("temp_json_struct")

现在,我得到的架构为:

root
 |-- temp_json_struct: struct (nullable = false)
 |    |-- temp_json_string: string (nullable = true)

我正在寻找的东西是:

root
 |-- temp_json_struct: struct (nullable = false)
 |    |-- name: string (nullable = true)
      |-- category: array (nullable = true)
         |-- products: array (nullable = true)
         |-- displayName: string (nullable = true)
         |-- displayLabel: string (nullable = true)
      |-- createdAt: timestamp (nullable = true)
      |-- updatedAt: timestamp (nullable = true)

另外,我不知道可以在 JSON 字符串中的架构。

我已经寻找其他选项,但无法找出解决方案。

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:

    我对来自 mongo 的数据有同样的问题。 _doc 是包含 json 字符串的列。我的有多个文件,这就是为什么第一行遍历每一行以提取模式的原因。此外,如果您事先知道您的架构,那么只需将 json_schema 替换为该架构即可。

    json_schema = spark.read.json(df.rdd.map(lambda row: row._doc)).schema
    df= df.withColumn('new_json_column', from_json(col('_doc'), json_schema))
    

    【讨论】:

      【解决方案2】:

      至少有两种不同的方法可以检索/发现给定 JSON 的架构。

      为了说明,我们先创建一些数据:

      import org.apache.spark.sql.types.StructType
      
      val jsData = Seq(
        ("""{
          "name":"test","id":"12","category":[
          {
            "products":[
              "A",
              "B"
            ],
            "displayName":"test_1",
            "displayLabel":"test1"
          },
          {
            "products":[
              "C"
            ],
            "displayName":"test_2",
            "displayLabel":"test2"
          }
        ],
        "createdAt":"",
        "createdBy":""}""")
      )
      

      选项 1:schema_of_json

      第一个选项是使用内置函数schema_of_json。该函数将以 DDL 格式返回给定 JSON 的架构:

      val json = jsData.toDF("js").collect()(0).getString(0)
      
      val ddlSchema: String = spark.sql(s"select schema_of_json('${json}')")
                                  .collect()(0) //get 1st row
                                  .getString(0) //get 1st col of the row as string
                                  .replace("null", "string") //replace type with string, this occurs since you have "createdAt":"" 
      
      // struct<category:array<struct<displayLabel:string,displayName:string,products:array<string>>>,createdAt:null,createdBy:null,id:string,name:string>
      
      val schema: StructType = StructType.fromDDL(s"js_schema $ddlSchema")
      

      请注意,您希望 schema_of_json 也可以在列级别上工作,即: schema_of_json(js_col),不幸的是,这不能按预期工作,因此我们不得不改为传递字符串。

      选项 2:使用 Spark JSON 阅读器(推荐)

      import org.apache.spark.sql.functions.from_json
      
      val schema: StructType = spark.read.json(jsData.toDS).schema
      
      // schema.printTreeString
      
      // root
      //  |-- category: array (nullable = true)
      //  |    |-- element: struct (containsNull = true)
      //  |    |    |-- displayLabel: string (nullable = true)
      //  |    |    |-- displayName: string (nullable = true)
      //  |    |    |-- products: array (nullable = true)
      //  |    |    |    |-- element: string (containsNull = true)
      //  |-- createdAt: string (nullable = true)
      //  |-- createdBy: string (nullable = true)
      //  |-- id: string (nullable = true)
      //  |-- name: string (nullable = true)
      

      如您所见,这里我们生成的是基于StructType 的架构,而不是像之前的情况那样的 DDL 字符串。

      发现架构后,我们可以继续下一步,将 JSON 数据转换为结构。为此,我们将使用from_json 内置函数:

      jsData.toDF("js")
            .withColumn("temp_json_struct", from_json($"js", schema))
            .printSchema()
      
      // root
      //  |-- js: string (nullable = true)
      //  |-- temp_json_struct: struct (nullable = true)
      //  |    |-- category: array (nullable = true)
      //  |    |    |-- element: struct (containsNull = true)
      //  |    |    |    |-- displayLabel: string (nullable = true)
      //  |    |    |    |-- displayName: string (nullable = true)
      //  |    |    |    |-- products: array (nullable = true)
      //  |    |    |    |    |-- element: string (containsNull = true)
      //  |    |-- createdAt: string (nullable = true)
      //  |    |-- createdBy: string (nullable = true)
      //  |    |-- id: string (nullable = true)
      //  |    |-- name: string (nullable = true)
      

      【讨论】:

        【解决方案3】:
        // import spark implicits for conversion to dataset (.as[String])
        import spark.implicits._
        
        val df = ??? //create your dataframe having the 'temp_json_string' column
        
        //convert Dataset[Row] aka Dataframe to Dataset[String]
        val ds = df.select("temp_json_string").as[String]
        
        //read as json
        spark.read.json(ds)
        

        Documentation

        【讨论】:

        • 请在您的答案中添加一些解释,以便其他人可以从中学习
        • 解释在 cmets。我还要解释什么
        • 一个好的解释可能不仅包含代码和代码在做什么,而且还包含为什么您选择这样做,尤其是如果问题已经包含其他答案
        • 简洁优雅。出于某种原因,其他答案都不适用于我的实例(缺少 lambdaschema_of_jsontoDS 方法),但这里没有问题!
        猜你喜欢
        • 1970-01-01
        • 2017-11-03
        • 1970-01-01
        • 1970-01-01
        • 2011-01-10
        • 2015-02-19
        • 1970-01-01
        • 1970-01-01
        • 2019-09-22
        相关资源
        最近更新 更多