【问题标题】:Infer schema from json string从 json 字符串推断模式
【发布时间】:2021-06-16 18:01:52
【问题描述】:

我有这个数据框:

cSchema = StructType([StructField("id1", StringType()), StructField("id2", StringType()), StructField("params", StringType())\
                      ,StructField("Col2", IntegerType())])

test_list = [[1, 2, '{"param1": "val1", "param2": "val2"}', 1], [1, 3, '{"param1": "val4", "param2": "val5"}', 3]]

df = spark.createDataFrame(test_list,schema=cSchema) 

+---+---+--------------------+----+
|id1|id2|              params|Col2|
+---+---+--------------------+----+
|  1|  2|{"param1": "val1"...|   1|
|  1|  3|{"param1": "val4"...|   3|
+---+---+--------------------+----+

我想将参数分解成列:

+---+---+----+------+------+
|id1|id2|Col2|param1|param2|
+---+---+----+------+------+
|  1|  2|   1|  val1|  val2|
|  1|  3|   3|  val4|  val5|
+---+---+----+------+------+

所以我编码了这个:

schema2 = StructType([StructField("param1", StringType()), StructField("param2", StringType())])

df.withColumn(
  "params", from_json("params", schema2)
).select(
  col('id1'), col('id2'),col('Col2'), col('params.*')
).show()

问题是params架构是动态的(变量schema2),他可能会从一个执行更改为另一个,所以我需要动态推断架构(可以让所有列都具有String类型).. .我想不出办法做到这一点..

谁能帮我解决这个问题?

【问题讨论】:

    标签: python json apache-spark pyspark apache-spark-sql


    【解决方案1】:

    在 Pyspark 中的语法应该是:

    import pyspark.sql.functions as F
    schema = F.schema_of_json(df.select('params').head()[0])
    
    df2 = df.withColumn(
      "params", F.from_json("params", schema)
    ).select(
      'id1', 'id2', 'Col2', 'params.*'
    )
    
    df2.show()
    +---+---+----+------+------+
    |id1|id2|Col2|param1|param2|
    +---+---+----+------+------+
    |  1|  2|   1|  val1|  val2|
    |  1|  3|   3|  val4|  val5|
    +---+---+----+------+------+
    

    【讨论】:

      【解决方案2】:

      这里是怎么做的,希望你能改成python

      使用schema_of_json 从值中动态获取架构,并使用from_json 进行读取。

      val schema = schema_of_json(df.first().getAs[String]("params"))
      df.withColumn("params", from_json($"params", schema))
        .select("id1", "id2", "Col2", "params.*")
        .show(false)
      

      【讨论】:

        【解决方案3】:

        如果您想获得更大的数据样本进行比较,可以将 params 字段读入列表,将其转换为 RDD,然后使用“spark.read.json()”读取

        params_list = df.select("params").rdd.flatMap(lambda x: x).collect()
        params_rdd = sc.parallelize(params_list)
        spark.read.json(params_rdd).schema
        

        这里需要注意的是,您可能不想加载太多数据,因为它们都被填充到局部变量中。尝试选择前 1000 名或任何合适的样本量。

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 2017-02-08
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2018-11-18
          相关资源
          最近更新 更多