【问题标题】:Removing duplicates from rows based on specific columns in an RDD/Spark DataFrame根据 RDD/Spark DataFrame 中的特定列从行中删除重复项
【发布时间】:2015-07-26 17:05:35
【问题描述】:

假设我有一个相当大的数据集,格式如下:

data = sc.parallelize([('Foo',41,'US',3),
                       ('Foo',39,'UK',1),
                       ('Bar',57,'CA',2),
                       ('Bar',72,'CA',2),
                       ('Baz',22,'US',6),
                       ('Baz',36,'US',6)])

我想做的是仅根据第一、第三和第四列的值删除重复的行。

删除完全重复的行很简单:

data = data.distinct()

第 5 行或第 6 行将被删除

但是我如何只删除基于第 1、3 和 4 列的重复行?即删除其中之一:

('Baz',22,'US',6)
('Baz',36,'US',6)

在 Python 中,这可以通过使用 .drop_duplicates() 指定列来完成。如何在 Spark/Pyspark 中实现相同的目标?

【问题讨论】:

    标签: apache-spark apache-spark-sql pyspark


    【解决方案1】:

    Pyspark 确实包含一个dropDuplicates() 方法,该方法是在 1.4 中引入的。 https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates

    >>> from pyspark.sql import Row
    >>> df = sc.parallelize([ \
    ...     Row(name='Alice', age=5, height=80), \
    ...     Row(name='Alice', age=5, height=80), \
    ...     Row(name='Alice', age=10, height=80)]).toDF()
    >>> df.dropDuplicates().show()
    +---+------+-----+
    |age|height| name|
    +---+------+-----+
    |  5|    80|Alice|
    | 10|    80|Alice|
    +---+------+-----+
    
    >>> df.dropDuplicates(['name', 'height']).show()
    +---+------+-----+
    |age|height| name|
    +---+------+-----+
    |  5|    80|Alice|
    +---+------+-----+
    

    【讨论】:

    • 有没有办法捕获它确实丢弃的记录?
    • x = usersDf.drop_duplicates(subset=['DETUserId']) - X 数据框将是所有删除的记录
    • @Rodney 这不是文档所说的:“返回一个删除重复行的新 DataFrame,可选择仅考虑某些列。” spark.apache.org/docs/2.1.0/api/python/…
    【解决方案2】:

    根据您的问题,尚不清楚您要使用哪些列来确定重复项。该解决方案背后的总体思路是根据标识重复项的列的值创建一个键。然后,您可以使用 reduceByKey 或 reduce 操作来消除重复。

    这里有一些代码可以帮助您入门:

    def get_key(x):
        return "{0}{1}{2}".format(x[0],x[2],x[3])
    
    m = data.map(lambda x: (get_key(x),x))
    

    现在,您有一个键值对 RDD,它由第 1,3 和 4 列作为键。 下一步将是reduceByKeygroupByKeyfilter。 这将消除重复。

    r = m.reduceByKey(lambda x,y: (x))
    

    【讨论】:

      【解决方案3】:

      我知道你已经接受了另一个答案,但如果你想这样做 DataFrame,只需使用 groupBy 和 agg。假设您已经创建了一个 DF(列名为“col1”、“col2”等),您可以这样做:

      myDF.groupBy($"col1", $"col3", $"col4").agg($"col1", max($"col2"), $"col3", $"col4")
      

      请注意,在这种情况下,我选择了 col2 的 Max,但您可以选择 avg、min 等。

      【讨论】:

      • 到目前为止,我对 DataFrames 的体验是,它们让一切变得更加优雅和快速。
      • 需要注意的是,这个答案是用 Scala 编写的——对于 pyspark 将 $"col1" 替换为 col("col1") 等。
      【解决方案4】:

      同意大卫的观点。另外,可能不我们想要groupBy聚合函数中除列之外的所有列,即,如果我们想纯粹删除重复项基于列的子集并保留原始数据框中的所有列。因此,更好的方法是使用 Spark 1.4.0 中提供的 dropDuplicates Dataframe api

      参考见:https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.sql.DataFrame

      【讨论】:

      • SparkR中有对应的功能吗?
      【解决方案5】:

      我使用了内置函数 dropDuplicates()。下面给出的Scala代码

      val data = sc.parallelize(List(("Foo",41,"US",3),
      ("Foo",39,"UK",1),
      ("Bar",57,"CA",2),
      ("Bar",72,"CA",2),
      ("Baz",22,"US",6),
      ("Baz",36,"US",6))).toDF("x","y","z","count")
      
      data.dropDuplicates(Array("x","count")).show()
      

      输出:

      +---+---+---+-----+
      |  x|  y|  z|count|
      +---+---+---+-----+
      |Baz| 22| US|    6|
      |Foo| 39| UK|    1|
      |Foo| 41| US|    3|
      |Bar| 57| CA|    2|
      +---+---+---+-----+
      

      【讨论】:

      • 该问题专门要求 pyspark 实现,而不是 scala
      【解决方案6】:

      下面的程序将帮助您完全删除重复项,或者如果您想根据某些列删除重复项,您甚至可以这样做:

      import org.apache.spark.sql.SparkSession
      
      object DropDuplicates {
      def main(args: Array[String]) {
      val spark =
        SparkSession.builder()
          .appName("DataFrame-DropDuplicates")
          .master("local[4]")
          .getOrCreate()
      
      import spark.implicits._
      
      // create an RDD of tuples with some data
      val custs = Seq(
        (1, "Widget Co", 120000.00, 0.00, "AZ"),
        (2, "Acme Widgets", 410500.00, 500.00, "CA"),
        (3, "Widgetry", 410500.00, 200.00, "CA"),
        (4, "Widgets R Us", 410500.00, 0.0, "CA"),
        (3, "Widgetry", 410500.00, 200.00, "CA"),
        (5, "Ye Olde Widgete", 500.00, 0.0, "MA"),
        (6, "Widget Co", 12000.00, 10.00, "AZ")
      )
      val customerRows = spark.sparkContext.parallelize(custs, 4)
      
      // convert RDD of tuples to DataFrame by supplying column names
      val customerDF = customerRows.toDF("id", "name", "sales", "discount", "state")
      
      println("*** Here's the whole DataFrame with duplicates")
      
      customerDF.printSchema()
      
      customerDF.show()
      
      // drop fully identical rows
      val withoutDuplicates = customerDF.dropDuplicates()
      
      println("*** Now without duplicates")
      
      withoutDuplicates.show()
      
      val withoutPartials = customerDF.dropDuplicates(Seq("name", "state"))
      
      println("*** Now without partial duplicates too")
      
      withoutPartials.show()
      
       }
       }
      

      【讨论】:

      • 注释“// 删除完全相同的行”第一次正确,第二次错误。也许是复制/粘贴错误?
      • 感谢@JoshuaStafford,删除了不好的评论。
      【解决方案7】:

      这是我的 Df contains 4 重复了两次,所以这里将删除重复的值。

      scala> df.show
      +-----+
      |value|
      +-----+
      |    1|
      |    4|
      |    3|
      |    5|
      |    4|
      |   18|
      +-----+
      
      scala> val newdf=df.dropDuplicates
      
      scala> newdf.show
      +-----+
      |value|
      +-----+
      |    1|
      |    3|
      |    5|
      |    4|
      |   18|
      +-----+
      

      【讨论】:

      • 你可以在 spark-shell 中检查我已经共享了正确的输出。这个答案与我们如何删除列或 df 中的重复值有关。
      • 您能否提供一个基于 OP 问题的示例?
      • 我已经在我的回答中给出了自己的例子。你可以参考那个。
      • 您的帖子对本次讨论没有任何价值。 @vaerek 已经发布了一个 PySpark df.dropDuplicates() 示例,包括如何将其应用于多个列(我最初的问题)。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2016-05-31
      • 2020-03-27
      • 2017-11-26
      • 2023-02-14
      • 1970-01-01
      • 2022-01-23
      • 1970-01-01
      相关资源
      最近更新 更多