【问题标题】:Spark: Generating JSON schema for a JSON stringSpark:为 JSON 字符串生成 JSON 模式
【发布时间】:2019-12-01 02:42:11
【问题描述】:

我正在使用 Spark 2.4.3 和 Scala 2.11

下面是我当前在 DataFrame 列中的 JSON 字符串。 我试图使用schema_of_json 函数将这个JSON string 的模式存储在另一列中。 但它低于错误。我该如何解决这个问题?

{
  "company": {
    "companyId": "123",
    "companyName": "ABC"
  },
  "customer": {
    "customerDetails": {
      "customerId": "CUST-100",
      "customerName": "CUST-AAA",
      "status": "ACTIVE",
      "phone": {
        "phoneDetails": {
          "home": {
            "phoneno": "666-777-9999"
          },
          "mobile": {
            "phoneno": "333-444-5555"
          }
        }
      }
    },
    "address": {
      "loc": "NORTH",
      "adressDetails": [
        {
          "street": "BBB",
          "city": "YYYYY",
          "province": "AB",
          "country": "US"
        },
        {
          "street": "UUU",
          "city": "GGGGG",
          "province": "NB",
          "country": "US"
        }
      ]
    }
  }
}

代码:

val df = spark.read.textFile("./src/main/resources/json/company.txt")
df.printSchema()
df.show()

root
 |-- value: string (nullable = true)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                                                                                                                                                                                                                                                                              |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"company":{"companyId":"123","companyName":"ABC"},"customer":{"customerDetails":{"customerId":"CUST-100","customerName":"CUST-AAA","status":"ACTIVE","phone":{"phoneDetails":{"home":{"phoneno":"666-777-9999"},"mobile":{"phoneno":"333-444-5555"}}}},"address":{"loc":"NORTH","adressDetails":[{"street":"BBB","city":"YYYYY","province":"AB","country":"US"},{"street":"UUU","city":"GGGGG","province":"NB","country":"US"}]}}}|
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+


df.withColumn("jsonSchema",schema_of_json(col("value")))

错误:

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'schemaofjson(`value`)' due to data type mismatch: The input json should be a string literal and not null; however, got `value`.;;
'Project [value#0, schemaofjson(value#0) AS jsonSchema#10]
+- Project [value#0]
   +- Relation[value#0] text

【问题讨论】:

    标签: json scala apache-spark


    【解决方案1】:

    我找到的解决方法是将列value 如下传递给schema_of_json 函数。

    df.withColumn("jsonSchema",schema_of_json(df.select(col("value")).first.getString(0)))
    

    礼貌:

    Implicit schema discovery on a JSON-formatted Spark DataFrame column

    【讨论】:

      【解决方案2】:

      自从引入SPARK-24709 以来,schema_of_json 只接受文字字符串。您可以通过调用以DDL 格式提取String 的架构

      spark.read
        .json(df.select("value").as[String])
        .schema
        .toDDL
      

      【讨论】:

      • 谢谢,就我而言,并非每一行都具有相同架构的相同 json 字符串。我该如何处理呢?
      【解决方案3】:

      如果有人正在寻找 pyspark 答案:

      import pyspark.sql.functions as F
      import pyspark.sql.types as T
      import json
          
        def process(json_content):
            if json_content is None : 
              return []
            try:
              # Parse the content of the json, extract the keys only
              keys = json.loads(json_content).keys()
              return list(keys)
            except Exception as e:
              return [e]
          
          udf_function = F.udf(process_file, T.ArrayType(T.StringType()))
          my_df = my_df.withColumn("schema", udf_function(F.col("json_raw"))
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2014-01-15
        • 2023-02-17
        • 1970-01-01
        • 2018-09-08
        • 1970-01-01
        • 2017-02-22
        相关资源
        最近更新 更多