为了获得滚动的percent_rank(),您必须能够使用窗口框架定义来对您根本不能的功能进行排名。 (类似这样的w = Window.orderBy('t', 'x').rowsBetween(-sys.maxsize, 0))
我找到了一种解决方法,但它涉及一个非常昂贵的笛卡尔连接:
首先让我们创建示例数据框:
import pyspark.sql.functions as psf
from pyspark.sql import HiveContext
hc = HiveContext(sc)
df = hc.createDataFrame(sc.parallelize(zip(range(5), [1,3,5,4,2])), ['t', 'x'])
笛卡尔连接:
df2 = df.groupBy(df.x.alias('x2')).agg(psf.min("t").alias("t2"))
df_cross = df.join(df2).filter("t2 <= t").withColumn("isSup", (df.x > df2.x2).cast("int"))
+---+---+---+---+-----+
| t| x| t2| x2|isSup|
+---+---+---+---+-----+
| 1| 3| 0| 1| 1|
| 2| 5| 0| 1| 1|
| 2| 5| 1| 3| 1|
| 3| 4| 0| 1| 1|
| 3| 4| 1| 3| 1|
| 3| 4| 2| 5| 0|
| 4| 2| 0| 1| 1|
| 4| 2| 1| 3| 0|
| 4| 2| 2| 5| 0|
| 4| 2| 3| 4| 0|
+---+---+---+---+-----+
最后我们按't'、'x'分组:
df_fin = df_cross.groupBy("t", "x").agg(
psf.count("*").alias("count"),
psf.sum("isSup").alias("rank")
).withColumn('pct_rank_win', psf.col("rank")/psf.greatest(psf.col('count') - 1, psf.lit(1)))
+---+---+-----+----+------------------+
| t| x|count|rank| pct_rank_win|
+---+---+-----+----+------------------+
| 0| 1| 1| 0| 0.0|
| 1| 3| 2| 1| 1.0|
| 2| 5| 3| 2| 1.0|
| 3| 4| 4| 2|0.6666666666666666|
| 4| 2| 5| 1| 0.25|
+---+---+-----+----+------------------+
df2 定义中的groupBy('x') 是为了确保密集排名(相同的值将具有相同的排名),如下例所示:
df = hc.createDataFrame(sc.parallelize(zip(range(6), [1,3,3,5,4,2])), ['t', 'x'])
+---+---+-----+----+------------------+
| t| x|count|rank| pct_rank_win|
+---+---+-----+----+------------------+
| 0| 1| 1| 0| 0.0|
| 1| 3| 2| 1| 1.0|
| 2| 3| 2| 1| 1.0|
| 3| 5| 3| 2| 1.0|
| 4| 4| 4| 2|0.6666666666666666|
| 5| 2| 5| 1| 0.25|
+---+---+-----+----+------------------+