【问题标题】:Pyspark Dataframe group by filteringPyspark Dataframe 组通过过滤
【发布时间】:2017-03-16 06:10:29
【问题描述】:

我有一个如下的数据框

cust_id   req    req_met
-------   ---    -------
 1         r1      1
 1         r2      0
 1         r2      1
 2         r1      1
 3         r1      1
 3         r2      1
 4         r1      0
 5         r1      1
 5         r2      0
 5         r1      1

我必须查看客户,看看他们有多少要求,看看他们是否至少满足过一次。可以有多个具有相同客户和要求的记录,一个满足和不满足。在上述情况下,我的输出应该是

cust_id
-------
  1
  2
  3

我所做的是

# say initial dataframe is df
df1 = df\
    .groupby('cust_id')\
    .countdistinct('req')\
    .alias('num_of_req')\
    .sum('req_met')\
    .alias('sum_req_met')

df2 = df1.filter(df1.num_of_req == df1.sum_req_met)

但在少数情况下它没有得到正确的结果

如何做到这一点?

【问题讨论】:

    标签: python apache-spark pyspark apache-spark-sql


    【解决方案1】:

    首先,我将准备上面给出的玩具数据集,

    from pyspark.sql.functions import col
    import pyspark.sql.functions as fn
    
    df = spark.createDataFrame([[1, 'r1', 1],
     [1, 'r2', 0],
     [1, 'r2', 1],
     [2, 'r1', 1],
     [3, 'r1', 1],
     [3, 'r2', 1],
     [4, 'r1', 0],
     [5, 'r1', 1],
     [5, 'r2', 0],
     [5, 'r1', 1]], schema=['cust_id', 'req', 'req_met'])
    df = df.withColumn('req_met', col("req_met").cast(IntegerType()))
    df = df.withColumn('cust_id', col("cust_id").cast(IntegerType()))
    

    我按cust_idreq 分组做同样的事情,然后数req_met。之后,我创建函数将这些要求降至 0、1

    def floor_req(r):
        if r >= 1:
            return 1
        else:
            return 0
    udf_floor_req = udf(floor_req, IntegerType())
    gr = df.groupby(['cust_id', 'req'])
    df_grouped = gr.agg(fn.sum(col('req_met')).alias('sum_req_met'))
    df_grouped_floor = df_grouped.withColumn('sum_req_met', udf_floor_req('sum_req_met'))
    

    现在,我们可以通过计算不同的需求数量和满足的需求总数来检查每个客户是否满足了所有需求。

    df_req = df_grouped_floor.groupby('cust_id').agg(fn.sum('sum_req_met').alias('sum_req'), 
                                                     fn.count('req').alias('n_req'))
    

    最后,你只需要检查两列是否相等:

    df_req.filter(df_req['sum_req'] == df_req['n_req'])[['cust_id']].orderBy('cust_id').show()
    

    【讨论】:

      【解决方案2】:
       select cust_id from  
      (select cust_id , MIN(sum_value) as m from 
      ( select cust_id,req ,sum(req_met) as sum_value from <data_frame> group by cust_id,req )
       temp group by cust_id )temp1 
      where m>0 ;
      

      这会得到想要的结果

      【讨论】:

      • 感谢您的解决方案。我看起来更像是数据框
      【解决方案3】:

      这是一个没有任何udf的方法。有点棘手。基本上按 cust_id 分组,req 完成,然后找到 req_met 的总和。然后去掉sum == 0的cust_id。

      df.filter( ~df.cust_id.isin([x[0] for x in df.groupby('cust_id','req').agg(F.sum('req_met').alias('sum_req_met')).filter(col('sum_req_met')==0).select('cust_id').collect()]) ).select('cust_id').distinct().show()
      

      【讨论】:

        猜你喜欢
        • 2021-08-31
        • 1970-01-01
        • 2018-12-24
        • 2016-05-13
        • 1970-01-01
        • 2018-03-16
        • 2020-10-28
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多