【问题标题】:UDF function taking much timeUDF 函数需要很长时间
【发布时间】:2021-08-02 05:13:20
【问题描述】:

我有一个这样的 DataFrame:

SCORE = spark.createDataFrame(
    [
      ('a', "Joe", 1),
      ('b', "Doe", 2),
      ('c', "Carl", 3),
      ('d', "CJ", 4),
      ('e', "Tom", 5),
    ], 
      StructType(
        [
            StructField("id", StringType(), False),
            StructField("user", StringType(), False),
            StructField("score", IntegerType(), False),
        ]
    )
)
id user score
a Joe 1
b Doe 2
c Carl 3
d CJ 4
e Tom 5

我编写了一个 UDF 来计算 percentile_score,它基于整个 score 列。它正在工作,正在生成一个名为 percentile_score 的新列:

from pyspark.sql.functions import udf, collect_list


def calculate_percentile(user_score, score_list):
    data_prs_score.sort()
    scores_count = len(score_list)

    cumulative_frequency = 0
    frequency = 0
    for score in score_list:
        if score == user_score:
            frequency += 1
        elif score > user_score:
            break

        cumulative_frequency += 1

    return (cumulative_frequency - (0.5 * frequency)) / scores_count


def make_score_list(score_list):
     return udf(lambda user_score: calculate_percentile(user_score, score_list), FloatType())
SCORE.withColumn('percentile_score', make_prs_score_list(SCORE.select(collect_list('score')).collect()[0][0])(col('score'))).show()

我的问题是,这个函数需要 1 小时才能运行。

我认为花费这么长时间的原因是我在 UDF 上使用了 collect()。但是,我看不到另一种构建它的方法。

所以我想知道我可以在这里做什么样的优化。

【问题讨论】:

    标签: python apache-spark pyspark


    【解决方案1】:

    您可以使用percent_rank 计算每一行的百分位数:

    from pyspark.sql import functions as F
    
    SCORE.withColumn("percentile_score", 
      F.percent_rank().over(Window.orderBy("score"))) \
      .show()
    

    打印

    +---+----+-----+----------------+                                               
    | id|user|score|percentile_score|
    +---+----+-----+----------------+
    |  a| Joe|    1|             0.0|
    |  b| Doe|    2|            0.25|
    |  c|Carl|    3|             0.5|
    |  d|  CJ|    4|            0.75|
    |  e| Tom|    5|             1.0|
    +---+----+-----+----------------+
    

    但是会出现以下警告:

    WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
    

    就像警告所说的那样,所有数据都将被收集到一个分区中,因此您(暂时)失去了 Spark 的并行性。但很有可能这种方法仍然比 UDF 更快。

    【讨论】:

      猜你喜欢
      • 2020-08-04
      • 2022-01-12
      • 1970-01-01
      • 2013-09-07
      • 2020-08-26
      • 2014-10-09
      • 2012-11-26
      • 2019-12-27
      • 2017-10-22
      相关资源
      最近更新 更多