【发布时间】:2021-08-02 05:13:20
【问题描述】:
我有一个这样的 DataFrame:
SCORE = spark.createDataFrame(
[
('a', "Joe", 1),
('b', "Doe", 2),
('c', "Carl", 3),
('d', "CJ", 4),
('e', "Tom", 5),
],
StructType(
[
StructField("id", StringType(), False),
StructField("user", StringType(), False),
StructField("score", IntegerType(), False),
]
)
)
| id | user | score |
|---|---|---|
| a | Joe | 1 |
| b | Doe | 2 |
| c | Carl | 3 |
| d | CJ | 4 |
| e | Tom | 5 |
我编写了一个 UDF 来计算 percentile_score,它基于整个 score 列。它正在工作,正在生成一个名为 percentile_score 的新列:
from pyspark.sql.functions import udf, collect_list
def calculate_percentile(user_score, score_list):
data_prs_score.sort()
scores_count = len(score_list)
cumulative_frequency = 0
frequency = 0
for score in score_list:
if score == user_score:
frequency += 1
elif score > user_score:
break
cumulative_frequency += 1
return (cumulative_frequency - (0.5 * frequency)) / scores_count
def make_score_list(score_list):
return udf(lambda user_score: calculate_percentile(user_score, score_list), FloatType())
SCORE.withColumn('percentile_score', make_prs_score_list(SCORE.select(collect_list('score')).collect()[0][0])(col('score'))).show()
我的问题是,这个函数需要 1 小时才能运行。
我认为花费这么长时间的原因是我在 UDF 上使用了 collect()。但是,我看不到另一种构建它的方法。
所以我想知道我可以在这里做什么样的优化。
【问题讨论】:
标签: python apache-spark pyspark