【问题标题】:Group by and filter a Pyspark data frame按 Pyspark 数据框分组和过滤
【发布时间】:2019-10-04 17:20:13
【问题描述】:

我有一个包含 3 列的 PySpark 数据框架。有些行在 2 列中相似,但在第三列中不相似,请参见下面的示例。

----------------------------------------
first_name | last_name | requests_ID    |
----------------------------------------
Joe        | Smith     |[2,3]           |
---------------------------------------- 
Joe        | Smith     |[2,3,5,6]       |
---------------------------------------- 
Jim        | Bush      |[9,7]           |
---------------------------------------- 
Jim        | Bush      |[21]            |
---------------------------------------- 
Sarah      | Wood      |[2,3]           |
----------------------------------------   

我想根据 {first_name, last_name} 列对行进行分组,并且只有 {requests_ID} 数量最多的行。所以结果应该是:

----------------------------------------
first_name | last_name | requests_ID    |
----------------------------------------
Joe        | Smith     |[2,3,5,6]       |
---------------------------------------- 
Jim        | Bush      |[9,7]           |
---------------------------------------- 
Sarah      | Wood      |[2,3]           |
---------------------------------------- 

我尝试了以下不同的方法,但它为我提供了 group-by 中两行的嵌套数组,而不是最长的。

gr_df = filtered_df.groupBy("first_name", "last_name").agg(F.collect_set("requests_ID").alias("requests_ID")) 

这是我得到的结果:

----------------------------------------
first_name | last_name | requests_ID    |
----------------------------------------
Joe        | Smith     |[[9,7],[2,3,5,6]]|
---------------------------------------- 
Jim        | Bush      |[[9,7],[21]]    |
---------------------------------------- 
Sarah      | Wood      |[2,3]           |
---------------------------------------- 

【问题讨论】:

    标签: python dataframe pyspark


    【解决方案1】:

    您可以使用size 来确定数组列的长度,使用window 如下:

    导入并创建示例 DataFrame

    import pyspark.sql.functions as f
    from pyspark.sql.window import Window
    
    df = spark.createDataFrame([('Joe','Smith',[2,3]),
    ('Joe','Smith',[2,3,5,6]),
    ('Jim','Bush',[9,7]),
    ('Jim','Bush',[21]),
    ('Sarah','Wood',[2,3])], ('first_name','last_name','requests_ID'))
    

    定义窗口以根据列的长度以降序获取requests_ID 列的行号。

    这里,f.size("requests_ID") 将给出 requests_ID 列的长度,desc() 将按降序对其进行排序。

    w_spec = Window().partitionBy("first_name", "last_name").orderBy(f.size("requests_ID").desc())
    

    应用窗口函数并获取第一行。

    df.withColumn("rn", f.row_number().over(w_spec)).where("rn ==1").drop("rn").show()
    +----------+---------+------------+
    |first_name|last_name| requests_ID|
    +----------+---------+------------+
    |       Jim|     Bush|      [9, 7]|
    |     Sarah|     Wood|      [2, 3]|
    |       Joe|    Smith|[2, 3, 5, 6]|
    +----------+---------+------------+
    

    【讨论】:

      【解决方案2】:

      要继续使用您当前的 df,如下所示,

      ----------------------------------------
      first_name | last_name | requests_ID    |
      ----------------------------------------
      Joe        | Smith     |[[9,7],[2,3,5,6]]|
      ---------------------------------------- 
      Jim        | Bush      |[[9,7],[21]]    |
      ---------------------------------------- 
      Sarah      | Wood      |[2,3]           |
      ---------------------------------------- 
      

      试试这个,

      import pyspark.sql.functions as F
      from pyspark.sql.types import IntegerType, ArrayType
      
      def myfunc(x):
        temp = []
        for _ in x:
          temp.append(len(x))
      
        max_ind = temp.index(max(temp))
      
        return x[max_ind]
      
      udf_extract = F.udf(myfunc, ArrayType(IntegerType()))
      
      df = df.withColumn('new_requests_ID', udf_extract('requests_ID'))
      
      #df.show()
      

      或者,没有变量声明,

      import pyspark.sql.functions as F
      
      @F.udf
      def myfunc(x):
        temp = []
        for _ in x:
          temp.append(len(x))
      
        max_ind = temp.index(max(temp))
      
        return x[max_ind]
      
      df = df.withColumn('new_requests_ID', myfunc('requests_ID'))
      
      #df.show()
      

      【讨论】:

        猜你喜欢
        • 2019-03-07
        • 2021-11-23
        • 2017-06-28
        • 2017-11-30
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2018-06-02
        • 1970-01-01
        相关资源
        最近更新 更多