【问题标题】:pyspark dataframe pivot a json column to new columnspyspark 数据框将 json 列旋转到新列
【发布时间】:2020-11-17 08:39:01
【问题描述】:

我想通过 python3 从 pyspark 数据框中的 json 列中提取数据。

我的数据框:

  year month p_name json_col 
  2010 05    rchsc  [{"attri_name": "in_market", "value": "yes"}, {"attri_name": "weight", "value": "12.56"}, {"attri_name" : "color", "value" : "red"} ]

我需要一个像这样的数据框:

 year month p_name in_market weight color 
 2010 05    rchsc  yes       12.56  red

我试过了

 from pyspark.sql.functions import from_json, col
 from pyspark.sql.types import StructType, StructField, StringType

 schema = StructType(
   [
     StructField('attri_name', StringType(), True),
    StructField('value', StringType(), True)
   ]
 )
 df.withColumn("new_col", from_json("json_col", schema))

但是,不会创建新列。 我不确定如何分解 json 列并将它们转换为新列。

【问题讨论】:

  • 我的回答和shu一样,只是在做pi​​vot的时候缩短了一点直接抓取struct的元素。

标签: python json dataframe apache-spark pyspark


【解决方案1】:

看看这个。您可以使用input data 预先定义schema 并使用explode 炸毁数组并使用pivot 并从struct 中获取元素以创建新列。

        from pyspark.sql import SparkSession
        from pyspark.sql import functions as F
        from pyspark.sql.types import StructType,StructField,StringType,IntegerType,ArrayType

        spark = SparkSession.builder \
            .appName('SO')\
            .getOrCreate()

        spark = SparkSession.builder.getOrCreate()

        schema = StructType([
          StructField("year", IntegerType(), True),
          StructField("month", IntegerType(),  True),
          StructField("p_name", StringType(), True),
          StructField("json_col", ArrayType(StructType([StructField("attri_name", StringType(), True),
                                                        StructField("value", StringType(), True)])))

        ])

        data = [(2010, 5, "rchsc", [{"attri_name": "in_market", "value": "yes"}, {"attri_name": "weight", "value": "12.56"}, {"attri_name" : "color", "value" : "red"}])]

        df = spark.createDataFrame(data,schema)

        df.show(truncate=False)

        # +----+-----+------+-------------------------------------------------+
        # |year|month|p_name|json_col                                         |
        # +----+-----+------+-------------------------------------------------+
        # |2010|5    |rchsc |[[in_market, yes], [weight, 12.56], [color, red]]|
        # +----+-----+------+-------------------------------------------------+



        df1 = df.select("year","month", "p_name", F.explode("json_col"))

        df2 = df1.groupBy("year", "month", "p_name").pivot("col.attri_name").agg(F.first("col.value"))

        df2.show()

        # +----+-----+------+-----+---------+------+
        # |year|month|p_name|color|in_market|weight|
        # +----+-----+------+-----+---------+------+
        # |2010|    5| rchsc|  red|      yes| 12.56|
        # +----+-----+------+-----+---------+------+

【讨论】:

  • 我将代码分解为 df1 和 df2 只是为了更好地理解。理想情况下,您可以将它们链接在一起,形成 df1。
【解决方案2】:

定义 schemaArrayType 因为你在 json 中有数组,然后是 explodepivot 列。

Example:

df.show()
#+----+-----+------+------------------------------------------------------------------------------------------------------------------------------------+
#|year|month|p_name|json_col                                                                                                                            |
#+----+-----+------+------------------------------------------------------------------------------------------------------------------------------------+
#|2010|05   |rchsc |[{"attri_name": "in_market", "value": "yes"}, {"attri_name": "weight", "value": "12.56"}, {"attri_name" : "color", "value" : "red"}]|
#+----+-----+------+------------------------------------------------------------------------------------------------------------------------------------+
from pyspark.sql.types import *
from pyspark.sql.functions import *

schema = ArrayType(StructType(
   [
     StructField('attri_name', StringType(), True),
    StructField('value', StringType(), True)
   ]
 ))

df.withColumn("ff",from_json(col("json_col"),schema)).\
selectExpr("*","explode(ff)").\
select("*","col.*").\
drop(*["json_col","ff","col"]).\
groupBy("year","month","p_name").\
pivot("attri_name").\
agg(first(col("value"))).\
show()
#+----+-----+------+-----+---------+------+
#|year|month|p_name|color|in_market|weight|
#+----+-----+------+-----+---------+------+
#|2010|   05| rchsc|  red|      yes| 12.56|
#+----+-----+------+-----+---------+------+

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-04-06
    • 2022-08-18
    • 2023-04-02
    • 1970-01-01
    • 2020-11-24
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多