【发布时间】:2021-03-23 17:47:32
【问题描述】:
我有以下数据框:
val df1 = Seq(("Roger","Rabbit", "ABC123"), ("Roger","Rabit", "ABC123"),("Roger","Rabbit", "ABC123"), ("Trevor","Philips","XYZ987"), ("Trevor","Philips","XYZ987")).toDF("first_name", "last_name", "record")
+----------+---------+------+
|first_name|last_name|record|
+----------+---------+------+
|Roger |Rabbit |ABC123|
|Roger |Rabit |ABC123|
|Roger |Rabbit |ABC123|
|Trevor |Philips |XYZ987|
|Trevor |Philips |XYZ987|
+----------+---------+------+
我想按record 列对这个数据框中的记录进行分组。然后我想在first_name 和last_name 字段中查找异常,对于具有相同record 值的所有记录,它们应该保持不变。
目前我发现的最佳方法是使用approx_count_distinct:
val wind_person = Window.partitionBy("record")
df1.withColumn("unique_fields",cconcat($"first_name",$"last_name"))
.withColumn("anomaly",capprox_count_distinct($"unique_fields") over wind_person)
.show(false)
+----------+---------+------+-------------+-------+
|first_name|last_name|record|unique_fields|anomaly|
+----------+---------+------+-------------+-------+
|Roger |Rabbit |ABC123|RogerRabbit |2 |
|Roger |Rabbit |ABC123|RogerRabbit |2 |
|Roger |Rabit |ABC123|RogerRabit |2 |
|Trevor |Philips |XYZ987|TrevorPhilips|1 |
|Trevor |Philips |XYZ987|TrevorPhilips|1 |
+----------+---------+------+-------------+-------+
检测到异常的地方是anomaly 列大于 1。
问题在于approx_count_distinct 我们得到的只是一个近似值,我不确定我们有多大把握它总是会返回一个准确的计数。
一些额外的信息:
- Dataframe 可能包含超过 5 亿条记录
- Dataframe 之前已根据
record列重新分区 - 对于
record的每个不同值,不超过 15 行
在这种情况下以 100% 的准确率使用 approx_count_distinct 是否安全,或者 spark 中是否有更好的窗口函数来实现这一点?
【问题讨论】:
-
重现场景添加:import org.apache.spark.sql.functions.approx_count_distinct import org.apache.spark.sql.expressions.Window import spark.implicits._
标签: scala apache-spark apache-spark-sql