【问题标题】:Spark dropduplicates but choose column with nullSpark dropduplicates,但选择空列
【发布时间】:2020-03-06 10:06:36
【问题描述】:

我有一张这样的表:

+---------+-------------+--------------+-----------+--------+--------------+--------------+
| cust_num|valid_from_dt|valid_until_dt|cust_row_id| cust_id|insert_load_dt|update_load_dt|
+---------+-------------+--------------+-----------+--------+--------------+--------------+
|950379405|   2018-08-24|    2018-08-24|   06885247|06885247|    2018-08-24|    2018-08-25|
|950379405|   2018-08-25|    2018-08-28|   06885247|06885247|    2018-08-25|    2018-08-29|
|950379405|   2018-08-29|    2019-12-16|   27344328|06885247|    2018-08-29|    2019-12-17|<- pair 1
|950379405|   2018-08-29|    2019-12-16|   27344328|06885247|    2018-08-29|              |<- pair 1
|950379405|   2019-12-17|    2019-12-24|   91778710|06885247|    2019-12-17|              |<- pair 2
|950379405|   2019-12-17|    2019-12-24|   91778710|06885247|    2019-12-17|    2019-12-25|<- pair 2
|950379405|   2019-12-25|    2019-12-25|   08396180|06885247|    2019-12-25|    2019-12-26|<- pair 3 
|950379405|   2019-12-25|    2019-12-25|   08396180|06885247|    2019-12-25|              |<- pair 3

如您所见,我的表中有一些重复的行,它们仅在 update_load_dt 为空或带有日期时有所不同。
我想以这种方式在我的数据框中删除重复项:

cable_dv_customer_fixed.dropDuplicates(['cust_num',
'valid_from_dt',
'valid_until_dt',
'cust_row_id',
'cust_id'])

但我想保留该行的更多信息。
我的意思是我想保留这一行where update_load_dt &lt;&gt; ''

是否可以修改 dropduplicates() 函数以便我可以从重复项中选择要选择的行?还是有其他(更好的)方法可以做到这一点?

【问题讨论】:

    标签: pyspark apache-spark-sql


    【解决方案1】:

    您可以为此使用窗口函数。但是,使用大数据可能会很慢。

    import pyspark.sql.function as F
    from pyspark.sql.window import Window
    
    df.withColumn("row_number", F.row_number().over(Window.partitionBy(<cols>).orderBy(F.asc_null_last("update_load_dt"))))
    .filter("row_number = 1")
    .drop("row_number") # optional
    

    【讨论】:

      【解决方案2】:

      这就是我的处理方式,F.max() 会做你想做的事并保持最高值的行。 (如果有多个,则在日期 col max() 上保留最新的日期条目)。

      from pyspark.sql.window import Window
      key_cols = ['cust_num','valid_from_dt','valid_until_dt','cust_row_id','cust_id']
      w = Window.partitionBy(key_cols)
      
      df.withColumn('update_load_dt', F.max('update_load_dt').over(w)).dropDuplicates(key_cols)
      

      我使用超过 10 亿行,这并不慢。 让我知道这是否有帮助!

      【讨论】:

        猜你喜欢
        • 2017-12-30
        • 2016-06-10
        • 1970-01-01
        • 1970-01-01
        • 2017-11-14
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多