测试数据:
from pyspark.sql import SparkSession, functions as F, Window
spark = SparkSession.builder.getOrCreate()
df = spark.range(12).withColumn(
'balance',
F.when(~F.col('id').isin([0, 1, 2, 3, 4]), F.col('id') + 500))
df.show()
#+---+-------+
#| id|balance|
#+---+-------+
#| 0| null|
#| 1| null|
#| 2| null|
#| 3| null|
#| 4| null|
#| 5| 505|
#| 6| 506|
#| 7| 507|
#| 8| 508|
#| 9| 509|
#| 10| 510|
#| 11| 511|
#+---+-------+
percent_rank 会给你精确的百分位数 - 结果可能有很长的十进制值。这就是为什么单独percent_rank 可能无法提供您想要的。
null_value = F.col('balance').isNull()
df = (
df.filter(~null_value)
.withColumn('percentile', F.percent_rank().over(Window.orderBy('balance')))
.unionByName(df.filter(null_value), True)
)
df.show()
#+---+-------+-------------------+
#| id|balance| percentile|
#+---+-------+-------------------+
#| 5| 505| 0.0|
#| 6| 506|0.16666666666666666|
#| 7| 507| 0.3333333333333333|
#| 8| 508| 0.5|
#| 9| 509| 0.6666666666666666|
#| 10| 510| 0.8333333333333334|
#| 11| 511| 1.0|
#| 0| null| null|
#| 1| null| null|
#| 2| null| null|
#| 3| null| null|
#| 4| null| null|
#+---+-------+-------------------+
以下应该有效。添加了舍入步骤。
null_value = F.col('balance').isNull()
pr = F.percent_rank().over(Window.orderBy('balance'))
df = (
df.filter(~null_value)
.withColumn('bucket', F.when(pr == 0, 1).otherwise(F.ceil(pr * 100)))
.unionByName(df.filter(null_value), True)
)
df.show()
#+---+-------+------+
#| id|balance|bucket|
#+---+-------+------+
#| 5| 505| 1|
#| 6| 506| 17|
#| 7| 507| 34|
#| 8| 508| 50|
#| 9| 509| 67|
#| 10| 510| 84|
#| 11| 511| 100|
#| 0| null| null|
#| 1| null| null|
#| 2| null| null|
#| 3| null| null|
#| 4| null| null|
#+---+-------+------+
您也可以考虑ntile。每个值都被添加到 n 个“桶”之一中。
当 n=100 时(测试表少于 100 项,因此只有第一个“桶”获取值):
null_value = F.col('balance').isNull()
df = (
df.filter(~null_value)
.withColumn('ntile_100', F.ntile(100).over(Window.orderBy('balance')))
.unionByName(df.filter(null_value), True)
)
df.show()
#+---+-------+---------+
#| id|balance|ntile_100|
#+---+-------+---------+
#| 5| 505| 1|
#| 6| 506| 2|
#| 7| 507| 3|
#| 8| 508| 4|
#| 9| 509| 5|
#| 10| 510| 6|
#| 11| 511| 7|
#| 0| null| null|
#| 1| null| null|
#| 2| null| null|
#| 3| null| null|
#| 4| null| null|
#+---+-------+---------+
当 n=4 时:
null_value = F.col('balance').isNull()
df = (
df.filter(~null_value)
.withColumn('ntile_100', F.ntile(4).over(Window.orderBy('balance')))
.unionByName(df.filter(null_value), True)
)
df.show()
#+---+-------+---------+
#| id|balance|ntile_100|
#+---+-------+---------+
#| 5| 505| 1|
#| 6| 506| 1|
#| 7| 507| 2|
#| 8| 508| 2|
#| 9| 509| 3|
#| 10| 510| 3|
#| 11| 511| 4|
#| 0| null| null|
#| 1| null| null|
#| 2| null| null|
#| 3| null| null|
#| 4| null| null|
#+---+-------+---------+