【问题标题】:Perform a scan operation in a group of records in Pyspark Dataframe (Spark v1.6)在 Pyspark Dataframe (Spark v1.6) 中的一组记录中执行扫描操作
【发布时间】:2019-09-18 04:15:20
【问题描述】:

专家,我有一个要求,我需要在一组记录中对 Pyspark 数据帧执行“扫描和操作”操作。这是我的 pyspark 数据框(Spark 1.6 版),有 2 个字段-

col1     effective_date
A        2019-02-01
A        2019-02-03
A        2019-02-05
A        2019-02-07

期望的结果是-

col1     effective_date expiry_date
A        2019-02-01     2019-02-02
A        2019-02-03     2019-02-04
A        2019-02-05     2019-02-06
A        2019-02-07     2999-12-31

这是一个典型的 SCD2(缓慢变化的维度)操作,我的记录应该按有效日期(升序)和 col1 值的“组”排序, 每条记录都应该有一个下一条记录的到期日期(有效日期-1),并且组中具有最大有效日期的最后一条记录应该有一个假定的到期日期“2999-12-31” 因为它是最新的记录并且它还没有未来的实例,但是如果将来确实出现了某些东西,它应该会过期。 例如-

col1     effective_date expiry_date
A        2019-02-01     2019-02-02
A        2019-02-03     2019-02-04
A        2019-02-05     2019-02-06
A        2019-02-07     2019-02-08
A        2019-02-09     2999-12-31

【问题讨论】:

    标签: apache-spark pyspark apache-spark-sql pyspark-sql


    【解决方案1】:

    你要使用的是窗口函数

    from pyspark.sql import Window
    from pyspark.sql import functions as F
    
    w = Window.partitionBy("col1").orderBy("effective_date")
    df = df.withColumn("expiry_date", F.date_sub(F.lead("effective_date").over(w), 1))
    

    【讨论】: