【问题标题】:Counting distinct values for a given column partitioned by a window function, without using approx_count_distinct()计算由窗口函数分区的给定列的不同值,而不使用 approx_count_distinct()
【发布时间】:2021-03-23 17:47:32
【问题描述】:

我有以下数据框:

val df1 = Seq(("Roger","Rabbit", "ABC123"), ("Roger","Rabit", "ABC123"),("Roger","Rabbit", "ABC123"), ("Trevor","Philips","XYZ987"), ("Trevor","Philips","XYZ987")).toDF("first_name", "last_name", "record")

+----------+---------+------+
|first_name|last_name|record|
+----------+---------+------+
|Roger     |Rabbit   |ABC123|
|Roger     |Rabit    |ABC123|
|Roger     |Rabbit   |ABC123|
|Trevor    |Philips  |XYZ987|
|Trevor    |Philips  |XYZ987|
+----------+---------+------+

我想按record 列对这个数据框中的记录进行分组。然后我想在first_namelast_name 字段中查找异常,对于具有相同record 值的所有记录,它们应该保持不变。

目前我发现的最佳方法是使用approx_count_distinct

val wind_person = Window.partitionBy("record")

df1.withColumn("unique_fields",cconcat($"first_name",$"last_name"))
.withColumn("anomaly",capprox_count_distinct($"unique_fields") over wind_person)
.show(false)    


+----------+---------+------+-------------+-------+
|first_name|last_name|record|unique_fields|anomaly|
+----------+---------+------+-------------+-------+
|Roger     |Rabbit   |ABC123|RogerRabbit  |2      |
|Roger     |Rabbit   |ABC123|RogerRabbit  |2      |
|Roger     |Rabit    |ABC123|RogerRabit   |2      |
|Trevor    |Philips  |XYZ987|TrevorPhilips|1      |
|Trevor    |Philips  |XYZ987|TrevorPhilips|1      |
+----------+---------+------+-------------+-------+

检测到异常的地方是anomaly 列大于 1。

问题在于approx_count_distinct 我们得到的只是一个近似值,我不确定我们有多大把握它总是会返回一个准确的计数。

一些额外的信息:

  • Dataframe 可能包含超过 5 亿条记录
  • Dataframe 之前已根据 record 列重新分区
  • 对于record 的每个不同值,不超过 15 行

在这种情况下以 100% 的准确率使用 approx_count_distinct 是否安全,或者 spark 中是否有更好的窗口函数来实现这一点?

【问题讨论】:

  • 重现场景添加:import org.apache.spark.sql.functions.approx_count_distinct import org.apache.spark.sql.expressions.Window import spark.implicits._

标签: scala apache-spark apache-spark-sql


【解决方案1】:

您可以在窗口wind_person 上使用unique_fieldscollect_set 并获取它的大小,该大小等于该字段的不同计数:

df1.withColumn("unique_fields", concat($"first_name", $"last_name"))
  .withColumn("anomaly", size(collect_set($"unique_fields").over(wind_person)))
  .show

//+----------+---------+------+-------------+-------+
//|first_name|last_name|record|unique_fields|anomaly|
//+----------+---------+------+-------------+-------+
//|Roger     |Rabbit   |ABC123|RogerRabbit  |2      |
//|Roger     |Rabit    |ABC123|RogerRabit   |2      |
//|Roger     |Rabbit   |ABC123|RogerRabbit  |2      |
//|Trevor    |Philips  |XYZ987|TrevorPhilips|1      |
//|Trevor    |Philips  |XYZ987|TrevorPhilips|1      |
//+----------+---------+------+-------------+-------+

【讨论】:

    【解决方案2】:

    您可以使用一些dense_rank 操作在窗口上获得准确的countDistinct

    val df2 = df1.withColumn(
        "unique_fields", 
        concat($"first_name",$"last_name")
    ).withColumn(
        "anomaly", 
        dense_rank().over(Window.partitionBy("record").orderBy("unique_fields")) + 
        dense_rank().over(Window.partitionBy("record").orderBy(desc("unique_fields"))) 
        - 1
    )
    
    df2.show
    +----------+---------+------+-------------+-------+
    |first_name|last_name|record|unique_fields|anomaly|
    +----------+---------+------+-------------+-------+
    |     Roger|    Rabit|ABC123|   RogerRabit|      2|
    |     Roger|   Rabbit|ABC123|  RogerRabbit|      2|
    |     Roger|   Rabbit|ABC123|  RogerRabbit|      2|
    |    Trevor|  Philips|XYZ987|TrevorPhilips|      1|
    |    Trevor|  Philips|XYZ987|TrevorPhilips|      1|
    +----------+---------+------+-------------+-------+
    

    【讨论】:

      猜你喜欢
      • 2016-10-23
      • 2012-11-08
      • 2021-09-25
      • 2018-05-24
      • 1970-01-01
      • 2021-01-27
      • 2012-12-27
      • 2021-05-26
      相关资源
      最近更新 更多