【问题标题】:Remove duplicates in Pair RDD based on Values根据值删除 Pair RDD 中的重复项
【发布时间】:2017-11-08 07:29:39
【问题描述】:

我有一个包含多行的 RDD,如下所示。

val row = [(String, String), (String, String, String)]

该值是一个元组序列。在元组中,最后一个字符串是时间戳,第二个是类别。我想根据每个类别的最大时间戳过滤这个序列。

(A,B)       Id      Category        Timestamp
-------------------------------------------------------
(123,abc)   1       A              2016-07-22 21:22:59+0000
(234,bcd)   2       B              2016-07-20 21:21:20+0000
(123,abc)   1       A              2017-07-09 21:22:59+0000
(345,cde)   4       C              2016-07-05 09:22:30+0000
(456,def)   5       D              2016-07-21 07:32:06+0000
(234,bcd)   2       B              2015-07-20 21:21:20+0000

我希望每个类别都有一行。我正在寻求有关获取每个类别的最大时间戳行的帮助。我希望得到的结果是

(A,B)       Id      Category        Timestamp
-------------------------------------------------------
(234,bcd)   2       B              2016-07-20 21:21:20+0000
(123,abc)   1       A              2017-07-09 21:22:59+0000
(345,cde)   4       C              2016-07-05 09:22:30+0000
(456,def)   5       D              2016-07-21 07:32:06+0000

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    给定输入dataframe

    +---------+---+--------+------------------------+
    |(A,B)    |Id |Category|Timestamp               |
    +---------+---+--------+------------------------+
    |[123,abc]|1  |A       |2016-07-22 21:22:59+0000|
    |[234,bcd]|2  |B       |2016-07-20 21:21:20+0000|
    |[123,abc]|1  |A       |2017-07-09 21:22:59+0000|
    |[345,cde]|4  |C       |2016-07-05 09:22:30+0000|
    |[456,def]|5  |D       |2016-07-21 07:32:06+0000|
    |[234,bcd]|2  |B       |2015-07-20 21:21:20+0000|
    +---------+---+--------+------------------------+
    

    您可以执行以下操作以获得您需要的结果dataframe

    import org.apache.spark.sql.functions._
    val requiredDataframe = df.orderBy($"Timestamp".desc).groupBy("Category").agg(first("(A,B)").as("(A,B)"), first("Id").as("Id"), first("Timestamp").as("Timestamp"))
    

    您应该拥有requiredDataframe

    +--------+---------+---+------------------------+
    |Category|(A,B)    |Id |Timestamp               |
    +--------+---------+---+------------------------+
    |B       |[234,bcd]|2  |2016-07-20 21:21:20+0000|
    |D       |[456,def]|5  |2016-07-21 07:32:06+0000|
    |C       |[345,cde]|4  |2016-07-05 09:22:30+0000|
    |A       |[123,abc]|1  |2017-07-09 21:22:59+0000|
    +--------+---------+---+------------------------+
    

    您可以使用Window 函数来做同样的事情,如下所示

    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.expressions.Window
    val windowSpec = Window.partitionBy("Category").orderBy($"Timestamp".desc)
    df.withColumn("rank", rank().over(windowSpec)).filter($"rank" === lit(1)).drop("rank")
    

    【讨论】:

    • 我想在 RDD 而不是数据帧上执行这些操作。所以我想知道是否需要以不同的方式完成?
    猜你喜欢
    • 1970-01-01
    • 2015-07-26
    • 1970-01-01
    • 1970-01-01
    • 2021-01-10
    • 1970-01-01
    • 1970-01-01
    • 2016-03-03
    • 1970-01-01
    相关资源
    最近更新 更多