【问题标题】:Updating rows based on the next time a specific value occurs in a dataframe pyspark根据下一次特定值出现在数据框pyspark中更新行
【发布时间】:2025-12-18 17:55:01
【问题描述】:

如果我有这样的数据框

    data = [(("ID1", "ENGAGEMENT", 2019-03-03)), (("ID1", "BABY SHOWER", 2019-04-13)), (("ID1", "WEDDING", 2019-07-10)), 
           (("ID1", "DIVORCE", 2019-09-26))]
    df = spark.createDataFrame(data, ["ID", "Event", "start_date"])
    df.show()
    
    +---+-----------+----------+
    | ID|      Event|start_date|
    +---+-----------+----------+
    |ID1| ENGAGEMENT|2019-03-03|
    |ID1|BABY SHOWER|2019-04-13|
    |ID1|    WEDDING|2019-07-10|
    |ID1|    DIVORCE|2019-09-26|
    +---+-----------+----------+

从该数据框中,必须根据后续事件的开始日期推断事件的结束日期

例如:如果您有一个订婚,那么这将在婚礼结束时结束,因此您可以将婚礼的开始日期作为订婚的结束日期。

所以上面的数据框应该得到这个输出。

+---+-----------+----------+----------+
| ID|      Event|start_date|  end_date|
+---+-----------+----------+----------+
|ID1| ENGAGEMENT|2019-03-03|2019-07-10|
|ID1|BABY SHOWER|2019-04-13|2019-04-13|
|ID1|    WEDDING|2019-07-10|2019-09-26|
|ID1|    DIVORCE|2019-09-26|      NULL|
+---+-----------+----------+----------+

我最初尝试在由 ID 划分的窗口上使用前导函数来获取前面的行,但因为它可能是 20 行之后“婚礼”事件将不起作用并且是一种非常混乱的方式去做。

df = df.select("*", *([f.lead(f.col(c),default=None).over(Window.orderBy("ID")).alias("LEAD_"+c) 
                      for c in ["Event", "start_date"]]))

activity_dates = activity_dates.select("*", *([f.lead(f.col(c),default=None).over(Window.orderBy("ID")).alias("LEAD_"+c) 
                      for c in ["LEAD_Event", "LEAD_start_date"]]))


df = df.withColumn("end_date", f.when((col("Event") == "ENGAGEMENT") & (col("LEAD_Event") == "WEDDING"), col("LEAD_start_date"))
                                .when((col("Event") == "ENGAGEMENT") & (col("LEAD_LEAD_Event") == "WEDDING"), col("LEAD_LEAD_start_date"))

如何在不循环数据集的情况下实现这一点?

【问题讨论】:

    标签: apache-spark pyspark apache-spark-sql pyspark-dataframes


    【解决方案1】:

    这是我的尝试。

    from pyspark.sql import Window
    from pyspark.sql.functions import *
    
    df.withColumn('end_date', expr('''
        case when Event = 'ENGAGEMENT'  then first(if(Event = 'WEDDING', start_date, null), True) over (Partition By ID)
             when Event = 'BABY SHOWER' then first(if(Event = 'BABY SHOWER', start_date, null), True) over (Partition By ID)
             when Event = 'WEDDING'     then first(if(Event = 'DIVORCE', start_date, null), True) over (Partition By ID)
        else null end
    ''')).show()
    
    +---+-----------+----------+----------+
    | ID|      Event|start_date|  end_date|
    +---+-----------+----------+----------+
    |ID1| ENGAGEMENT|2019-03-03|2019-07-10|
    |ID1|BABY SHOWER|2019-04-13|2019-04-13|
    |ID1|    WEDDING|2019-07-10|2019-09-26|
    |ID1|    DIVORCE|2019-09-26|      null|
    +---+-----------+----------+----------+
    
    

    【讨论】: