【发布时间】:2023-03-26 01:56:01
【问题描述】:
我想要实现的目标相当简单:我想检查所有 ID(uuid)是否经历了某种“状态”(行为状态)。如果有,则将与该 ID 关联的所有记录返回给我。例如,如果以下 ID 之一的状态为“三”,我希望保留与该 ID 关联的所有记录。到目前为止,我可以通过以下两种方式实现这一点:
// first method
val dfList = df.filter($"status" === "three").select($"id").distinct.map(_.getString(0)).collect.toList
val dfTransformedOne = df.filter($"id".isin(dfList:_*))
// second method
val dfIds = df.filter($"status" === "three").select($"id").distinct
val dfTransformedTwo = df.join(broadcast(dfIds), Seq("id"))
上述两种方法适用于我正在处理的示例数据,但是当我开始增加要处理的数据量时遇到了一些性能问题,因为我可能有数百万到数亿个 ID我需要过滤。是否有更有效的方法来执行上述操作,还是只是增加我正在使用的硬件?
以下是示例数据和预期输出。
val df = Seq(
("1234", "one"),
("1234", "two"),
("1234", "three"),
("234", "one"),
("234", "one"),
("234", "two")
).toDF("id", "status")
df.show
+----+------+
| id|status|
+----+------+
|1234| one|
|1234| two|
|1234| three|
| 234| one|
| 234| one|
| 234| two|
+----+------+
dfTransformed.show()
+----+------+
| id|status|
+----+------+
|1234| one|
|1234| two|
|1234| three|
+----+------+
【问题讨论】:
标签: apache-spark apache-spark-sql