【问题标题】:Keep only duplicates from a DataFrame regarding some field仅保留 DataFrame 中有关某些字段的重复项
【发布时间】:2018-03-29 15:34:27
【问题描述】:

我有这个 spark DataFrame:

+---+-----+------+----+------------+------------+
| ID|  ID2|Number|Name|Opening_Hour|Closing_Hour|
+---+-----+------+----+------------+------------+
|ALT|  QWA|     6|null|    08:59:00|    23:30:00|
|ALT|AUTRE|     2|null|    08:58:00|    23:29:00|
|TDR|  QWA|     3|null|    08:57:00|    23:28:00|
|ALT| TEST|     4|null|    08:56:00|    23:27:00|
|ALT|  QWA|     6|null|    08:55:00|    23:26:00|
|ALT|  QWA|     2|null|    08:54:00|    23:25:00|
|ALT|  QWA|     2|null|    08:53:00|    23:24:00|
+---+-----+------+----+------------+------------+

我想获得一个新的数据框,其中仅包含 "ID""ID2""Number" 三个字段不唯一的行。

表示我想要这个DataFrame:

+---+-----+------+----+------------+------------+
| ID|  ID2|Number|Name|Opening_Hour|Closing_Hour|
+---+-----+------+----+------------+------------+
|ALT|  QWA|     6|null|    08:59:00|    23:30:00|
|ALT|  QWA|     2|null|    08:53:00|    23:24:00|
+---+-----+------+----+------------+------------+

或者可能是一个包含所有重复项的数据框:

+---+-----+------+----+------------+------------+
| ID|  ID2|Number|Name|Opening_Hour|Closing_Hour|
+---+-----+------+----+------------+------------+
|ALT|  QWA|     6|null|    08:59:00|    23:30:00|
|ALT|  QWA|     6|null|    08:55:00|    23:26:00|
|ALT|  QWA|     2|null|    08:54:00|    23:25:00|
|ALT|  QWA|     2|null|    08:53:00|    23:24:00|
+---+-----+------+----+------------+------------+

【问题讨论】:

    标签: apache-spark pyspark spark-dataframe


    【解决方案1】:

    执行此操作的一种方法是使用pyspark.sql.Window 添加一列,该列计算每行的("ID", "ID2", "Name") 组合的重复数。然后只选择重复数大于1的行。

    import pyspark.sql.functions as f
    from pyspark.sql import Window
    
    w = Window.partitionBy('ID', 'ID2', 'Number')
    df.select('*', f.count('ID').over(w).alias('dupeCount'))\
        .where('dupeCount > 1')\
        .drop('dupeCount')\
        .show()
    #+---+---+------+----+------------+------------+
    #| ID|ID2|Number|Name|Opening_Hour|Closing_Hour|
    #+---+---+------+----+------------+------------+
    #|ALT|QWA|     2|null|    08:54:00|    23:25:00|
    #|ALT|QWA|     2|null|    08:53:00|    23:24:00|
    #|ALT|QWA|     6|null|    08:59:00|    23:30:00|
    #|ALT|QWA|     6|null|    08:55:00|    23:26:00|
    #+---+---+------+----+------------+------------+
    

    我使用pyspark.sql.functions.count() 来计算每个组中的项目数。这将返回一个包含所有重复项的 DataFrame(您显示的第二个输出)。

    如果您只想获得每个("ID", "ID2", "Name") 组合的一行,您可以使用另一个窗口来对行进行排序。

    例如,下面我为row_number添加另一列,并仅选择重复计数大于1且行数等于1的行。这样可以保证每个分组一个行。

    w2 = Window.partitionBy('ID', 'ID2', 'Number').orderBy('ID', 'ID2', 'Number')
    df.select(
            '*',
            f.count('ID').over(w).alias('dupeCount'),
            f.row_number().over(w2).alias('rowNum')
        )\
        .where('(dupeCount > 1) AND (rowNum = 1)')\
        .drop('dupeCount', 'rowNum')\
        .show()
    #+---+---+------+----+------------+------------+
    #| ID|ID2|Number|Name|Opening_Hour|Closing_Hour|
    #+---+---+------+----+------------+------------+
    #|ALT|QWA|     2|null|    08:54:00|    23:25:00|
    #|ALT|QWA|     6|null|    08:59:00|    23:30:00|
    #+---+---+------+----+------------+------------+
    

    【讨论】:

      【解决方案2】:

      这是一种无需 Window 的方法。

      具有重复项的 DataFrame

      df.exceptAll(df.drop_duplicates(['ID', 'ID2', 'Number'])).show()
      # +---+---+------+------------+------------+
      # | ID|ID2|Number|Opening_Hour|Closing_Hour|
      # +---+---+------+------------+------------+
      # |ALT|QWA|     2|    08:53:00|    23:24:00|
      # |ALT|QWA|     6|    08:55:00|    23:26:00|
      # +---+---+------+------------+------------+
      

      包含所有重复项的 DataFrame(使用 left_anti 连接)

      df.join(df.groupBy('ID', 'ID2', 'Number')\
                .count().where('count = 1').drop('count'),
              on=['ID', 'ID2', 'Number'],
              how='left_anti').show()
      # +---+---+------+------------+------------+
      # | ID|ID2|Number|Opening_Hour|Closing_Hour|
      # +---+---+------+------------+------------+
      # |ALT|QWA|     2|    08:54:00|    23:25:00|
      # |ALT|QWA|     2|    08:53:00|    23:24:00|
      # |ALT|QWA|     6|    08:59:00|    23:30:00|
      # |ALT|QWA|     6|    08:55:00|    23:26:00|
      # +---+---+------+------------+------------+
      

      【讨论】:

      • 请注意,虽然使用exceptAll() 方法似乎比window 更容易实现,但它效率极低且计算量很大。
      【解决方案3】:

      要扩展 pault 的 really great answer:我经常需要将数据帧子集化为仅重复 x 次的条目,并且由于我需要经常这样做,所以我将它变成了一个函数,我只需导入大量我脚本开头的其他辅助函数:

      import pyspark.sql.functions as f
      from pyspark.sql import Window
      def get_entries_with_frequency(df, cols, num):
        """
        This function will filter the dataframe df down to all the rows that
        have the same values in cols num times. Example: If num=3, col="cartype", 
        then the function will only return rows where a certain cartype occurs exactly 3 times
        in the dataset. If col "cartype" contains the following:
        ["Mazda", "Seat", "Seat", "VW", "Mercedes", "VW", "VW", "Mercedes", "Seat"],
        then the function will only return rows containing "VW" or "Seat" 
        since these occur exactly 3 times.
      
        df: Pyspark dataframe
        cols: Either string column name or list of strings for multiple columns.
        num: int - The exact number of times a value (or combination of values,
             if cols is a list) has to appear in df.
        """
        if type(cols)==str:
          cols = [cols]
        w = Window.partitionBy(cols)
        return df.select('*', f.count(cols[0]).over(w).alias('dupeCount'))\
                 .where("dupeCount = {}".format(num))\
                 .drop('dupeCount')
      

      【讨论】:

      • 试想一下,如果get_entries_with_frequency 有参数的文档字符串,这个答案会有多棒,所以除了@Thomas 之外的其他人可以使用它。
      • @pauljohn32 自从我写这篇文章以来已经有一段时间了,我发现很难解释它到底做了什么,但我已经尽力了 ;-)
      猜你喜欢
      • 1970-01-01
      • 2021-10-05
      • 2020-05-31
      • 2016-11-21
      • 1970-01-01
      • 2015-10-26
      • 2016-07-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多