【发布时间】:2021-02-03 17:16:01
【问题描述】:
我的最终产品需要在一周中的每一天与当天活动最多的地方排成一行。即 Mon Place A 56、Wed Place C 64 等。我尝试过使用 Window 功能并使用 max 和 groupie,但我让自己感到困惑。
【问题讨论】:
标签: scala apache-spark pyspark apache-spark-sql
我的最终产品需要在一周中的每一天与当天活动最多的地方排成一行。即 Mon Place A 56、Wed Place C 64 等。我尝试过使用 Window 功能并使用 max 和 groupie,但我让自己感到困惑。
【问题讨论】:
标签: scala apache-spark pyspark apache-spark-sql
出于您的目的,您需要写window function:
val df = Seq(
("Mon", "Place A", 10),
("Mon", "Place B", 42),
("Wed", "Place C", 41),
("Thurs", "Place D", 45),
("Fri", "Place E", 64),
("Fri", "Place A", 12),
("Wed", "Place F", 54),
("Wed", "Place A", 1)
).toDF("day", "place", "number")
df.show()
df.withColumn("orderedNumberForDay",
row_number()
.over(
Window.orderBy(col("number").desc)
.partitionBy("day")
)
).filter(col("orderedNumberForDay") === lit(1))
.select("day", "place", "number")
.show()
/*
+-----+-------+------+ +-----+-------+------+
| day| place|number| | day| place|number|
+-----+-------+------+ +-----+-------+------+
| Mon|Place A| 10| | Mon|Place B| 42|
| Mon|Place B| 42| ===>> | Wed|Place F| 54|
| Wed|Place C| 41| | Fri|Place E| 64|
|Thurs|Place D| 45| |Thurs|Place D| 45|
| Fri|Place E| 64| +-----+-------+------+
| Fri|Place A| 12|
| Wed|Place F| 54|
| Wed|Place A| 1|
+-----+-------+------+
*/
简单解释一下它是如何工作的
首先你需要添加带有window function结果的列,这里是:
df.withColumn("orderedNumberForDay",
row_number()
.over(
Window.orderBy(col("number").desc)
.partitionBy("day")
)
)
row_number() - 是partition 内的行计数器。 Partition 就像group by 中的组。 partitionBy("day") 只是将具有相同 day 列值的窗口分组。最后,我们必须在desc 订单中通过number 订购window,所以我们的window function 中有orderBy(col("number").desc。 over 就像一座桥梁,从 windows 到 windows 内部的一些有用计算,它只是绑定 row_number 和 window function。
执行完这个阶段我们会有数据:
+-----+-------+------+-------------------+
| day| place|number|orderedNumberForDay|
+-----+-------+------+-------------------+
| Mon|Place B| 42| 1|
| Mon|Place A| 10| 2|
| Wed|Place F| 54| 1|
| Wed|Place C| 41| 2|
| Wed|Place A| 1| 3|
| Fri|Place E| 64| 1|
| Fri|Place A| 12| 2|
|Thurs|Place D| 45| 1|
+-----+-------+------+-------------------+
所以,我们只需要filter 行与orderedNumberForDay 等于1 - 它将与max number 并选择开始列:day, place, number。最终结果将是:
+-----+-------+------+
| day| place|number|
+-----+-------+------+
| Mon|Place B| 42|
| Wed|Place F| 54|
| Fri|Place E| 64|
|Thurs|Place D| 45|
+-----+-------+------+
【讨论】:
Spark 3.0 引入了聚合函数 max_by,它可以满足您的需求:
df.groupBy("day")
.agg(expr("max_by(place, number)"), max('number))
.show()
结果:
+-----+---------------------+-----------+
| day|max_by(place, number)|max(number)|
+-----+---------------------+-----------+
| Mon| Place B| 42|
| Wed| Place F| 54|
| Fri| Place E| 64|
|Thurs| Place D| 45|
+-----+---------------------+-----------+
【讨论】: