【问题标题】:How to execute custom logic at pyspark window partition如何在pyspark窗口分区执行自定义逻辑
【发布时间】:2025-12-27 10:25:16
【问题描述】:

我有一个如下所示格式的数据框,其中我们将有多个DEPNAME 条目,如下所示,我的要求是在DEPNAME 级别设置result = Y,如果flag_1flag_2= Y,如果标志即flag_1flag_2 = N,则结果将设置为N,如DEPNAME=personnel 所示

我能够使用连接获得所需的结果,但我很好奇我们是否可以使用窗口函数来实现它,因为数据集的大小非常大。

+---------+------+------+-+------+
|  depName|flag_1|flag_2| result |
+---------+------+------+-+------+
|    sales|    N|  Y    |  Y    |
|    sales|    N|  N    |  Y    |
|    sales|    N|  N    |  Y    |
|personnel|    N|  N    |  N    |
|personnel|    N|  N    |  N    |
|  develop|    Y|  N    |  Y    |
|  develop|    N|  N    |  Y    |
|  develop|    N|  N    |  Y    |
|  develop|    N|  N    |  Y    |
|  develop|    N|  N    |  Y    |
+---------+-----+------+ +------+

【问题讨论】:

    标签: python sql dataframe apache-spark pyspark


    【解决方案1】:

    这回答了问题的原始版本。

    这看起来像 case 表达式:

    select t.*,
           (case when flag_1 = 'Y' or flag_2 = 'Y'
                 then 'Y' else 'N'
            end) as result
    

    对于更新版本:

    select t.*,
           max(case when flag_1 = 'Y' or flag_2 = 'Y'
                    then 'Y' else 'N'
               end) over (partition by depname) as result
    

    【讨论】:

    • 它应该在分区depname级别,如果任何标志为Y,则将该分区的所有行设置为Y,否则为N。
    • @nilesh1212 。 . .通过更改问题使答案无效是不礼貌的。更好的方法是简单地提出一个新问题。但是,我已经使用符合您指定的版本更新了答案。
    • 是的,很抱歉,我从 cmets 了解到用户无法理解问题(我的错误),因此我更新了问题,第一个版本的问题没有回应我的意思说。
    【解决方案2】:

    如果您正在使用 PySpark(因为您将其包含在标签中)并说您的数据框称为 df,您可以使用

    import pyspark.sql.functions as F
    from pyspark.sql.window import Window
    
    w = Window.partitionBy('depName')
    
    df = df\
      .withColumn('cnt', F.sum(F.when((F.col('flag_1') == 'Y') | (F.col('flag_2') == 'Y'), 1).otherwise(0)).over(w))\
      .withColumn('result', F.when(F.col('cnt') >= 1, 'Y').otherwise('N'))
    
    df.show()
    
    +---------+------+------+---+------+
    |  depName|flag_1|flag_2|cnt|result|
    +---------+------+------+---+------+
    |  develop|     Y|     N|  1|     Y|
    |  develop|     N|     N|  1|     Y|
    |  develop|     N|     N|  1|     Y|
    |  develop|     N|     N|  1|     Y|
    |  develop|     N|     N|  1|     Y|
    |personnel|     N|     N|  0|     N|
    |personnel|     N|     N|  0|     N|
    |    sales|     N|     Y|  1|     Y|
    |    sales|     N|     N|  1|     Y|
    |    sales|     N|     N|  1|     Y|
    +---------+------+------+---+------+
    

    基本上,在由depName 确定的每个分区中,您计算​​条件flag_1 == 'Y' | flag_2 == 'Y' 出现的次数,并将其存储在cnt 中以用于该分区的所有行。
    然后,您使用一个简单的.when 指示所有具有cnt >= 1 的组。

    【讨论】:

    • 如果任何标志列是 Y,我需要在 depName 分区级别执行此操作。它将将该 depName 分区的结果设置为 Y。不在行级别
    • @nilesh1212 在您的问题的先前版本中并不清楚。我现在更新了我的答案。
    最近更新 更多