【问题标题】:Alternative ways to apply a user defined aggregate function in pyspark在 pyspark 中应用用户定义的聚合函数的替代方法
【发布时间】:2018-01-29 12:04:08
【问题描述】:

我正在尝试将用户定义的聚合函数应用于 spark 数据帧,以应用附加平滑,请参见下面的代码:

import findspark
findspark.init()
import pyspark as ps
from pyspark.sql import SQLContext
from pyspark.sql.functions import col, col, collect_list, concat_ws, udf

try:
    sc
except NameError:
    sc = ps.SparkContext()
    sqlContext = SQLContext(sc)

df = sqlContext.createDataFrame([['A', 1],
                            ['A',1],
                            ['A',0],
                            ['B',0],
                            ['B',0],
                            ['B',1]], schema=['name', 'val'])


def smooth_mean(x):
    return (sum(x)+5)/(len(x)+5)

smooth_mean_udf = udf(smooth_mean)

df.groupBy('name').agg(collect_list('val').alias('val'))\
.withColumn('val', smooth_mean_udf('val')).show()

这样做有意义吗?据我了解,这不能很好地扩展,因为我使用的是udf。我也找不到collect_list的确切工作方式,名称中的collect部分似乎表明数据被“收集”到边缘节点,但我假设数据被“收集”到各个节点?

提前感谢您的任何反馈。

【问题讨论】:

    标签: python apache-spark pyspark user-defined-functions


    【解决方案1】:

    据我了解,这无法扩展

    您的理解是正确的,这里最大的问题是collect_list 哪个is just good old groupByKey。 Python udf 的影响要小得多,但对于简单的算术运算,使用它没有意义。

    只需使用标准聚合

    from pyspark.sql.functions import sum as sum_, count
    
    (df
        .groupBy("name")
        .agg(((sum_("val") + 5) / (count("val") + 5)).alias("val"))
        .show())
    
    # +----+-----+
    # |name|  val|
    # +----+-----+
    # |   B| 0.75|
    # |   A|0.875|
    # +----+-----+
    

    【讨论】:

    • 一个后续问题;你怎么知道 collect_list 使用了groupByKey?如果答案很长/很复杂,我可以将其作为一个新问题发布在 SO 上。
    猜你喜欢
    • 2021-06-06
    • 2016-11-03
    • 2015-12-03
    • 2017-10-13
    • 2019-03-06
    • 2023-03-26
    • 2018-05-21
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多