【问题标题】:pyspark dataframe with json column to aggregate the json elements into a new column and remove duplicated带有 json 列的 pyspark 数据框将 json 元素聚合到一个新列中并删除重复的
【发布时间】:2020-11-23 00:28:44
【问题描述】:

我正在尝试在 databricks 上读取带有 json 列的 pyspark 数据框。

数据框:

  year month json_col
  2010 09    [{"p_id":"vfdvtbe"}, {"p_id":"cdscs"}, {"p_id":"usdvwq"}]
  2010 09    [{"p_id":"ujhbe"}, {"p_id":"cdscs"}, {"p_id":"yjev"}]
  2007 10    [{"p_id":"ukerge"}, {"p_id":"ikrtw"}, {"p_id":"ikwca"}]
  2007 10    [{"p_id":"unvwq"}, {"p_id":"cqwcq"}, {"p_id":"ikwca"}]

我需要一个新的数据框,其中所有重复的“p_id”都被删除并按年和月汇总

  year month p_id (string)
  2010 09    ["vfdvtbe", "cdscs", "usdvwq", "ujhbe", "yjev"]
  2007 10    ["ukerge", "ikrtw", "ikwca", "unvwq", "cqwcq"]

新列“p_id”是一个数组字符串。 我想计算每年和每月有多少不同的“p_id”以及它们中有多少。 并且,还要去除出现在同一年同月的重复元素。

我的代码:

from pyspark.sql.types import *
from pyspark.sql.functions import *

schema = ArrayType(StructType(
[
   StructField('p_id', StringType(), True)
]
))

schema = ArrayType(MapType(StringType(),StringType()))

t = ff.withColumn("data",F.explode(F.from_json(F.col("json_col"),schema))).withColumn("data",F.when(F.col("data")["product_id"].cast("string").isNotNull(),F.col("data")["product_id"])).filter(F.col("data").isNotNull()).drop("json_col")


display(t)

我不确定这是否可以删除重复项?

谢谢

【问题讨论】:

    标签: json dataframe apache-spark pyspark


    【解决方案1】:

    在这种情况下使用 flatten, array_distinctgroupBy, collect_list 函数。

    Example:

    df.show(10,False)
    #+----+-----+---------------------------------------------------------+
    #|year|month|json_col                                                 |
    #+----+-----+---------------------------------------------------------+
    #|2010|09   |[{"p_id":"vfdvtbe"}, {"p_id":"cdscs"}, {"p_id":"usdvwq"}]|
    #|2010|09   |[{"p_id":"ujhbe"}, {"p_id":"cdscs"}, {"p_id":"yjev"}]    |
    #|2007|10   |[{"p_id":"ukerge"}, {"p_id":"ikrtw"}, {"p_id":"ikwca"}]  |
    #|2007|10   |[{"p_id":"unvwq"}, {"p_id":"cqwcq"}, {"p_id":"ikwca"}]   |
    #+----+-----+---------------------------------------------------------+
    
    from pyspark.sql.types import *
    from pyspark.sql.functions import *
    
    schema = ArrayType(StructType(
    [
       StructField('p_id', StringType(), True)
    ]
    ))
    
    
    df1=df.withColumn("ff",from_json(col("json_col"),schema)).\
    select("year","month",expr('transform(ff , f -> f.p_id)').alias("tmp"))
    
    df1.groupBy("year","month").\
    agg(to_json(array_distinct(flatten(collect_list(col("tmp"))))).alias("p_id")).\
    show(10,False)
    #+----+-----+-------------------------------------------+
    #|year|month|p_id                                       |
    #+----+-----+-------------------------------------------+
    #|2010|09   |["vfdvtbe","cdscs","usdvwq","ujhbe","yjev"]|
    #|2007|10   |["ukerge","ikrtw","ikwca","unvwq","cqwcq"] |
    #+----+-----+-------------------------------------------+
    

    【讨论】:

      猜你喜欢
      • 2019-10-08
      • 2020-11-17
      • 2021-05-28
      • 2020-06-11
      • 1970-01-01
      • 2018-02-17
      • 2015-04-18
      • 2020-08-16
      • 2021-11-18
      相关资源
      最近更新 更多