【问题标题】:User Defined Aggregate Function in PySpark SQLPySpark SQL 中用户定义的聚合函数
【发布时间】:2021-06-06 20:33:14
【问题描述】:

如何在 PySpark SQL 中实现用户定义的聚合函数 (UDAF)?

pyspark version = 3.0.2
python version = 3.7.10

作为一个最小的例子,我想用 UDAF 替换 AVG 聚合函数:

sc = SparkContext()
sql = SQLContext(sc)
df = sql.createDataFrame(
    pd.DataFrame({'id': [1, 1, 2, 2], 'value': [1, 2, 3, 4]}))
df.createTempView('df')
rv = sql.sql('SELECT id, AVG(value) FROM df GROUP BY id').toPandas()

rv 将在哪里:

In [2]: rv
Out[2]:
   id  avg(value)
0   1         1.5
1   2         3.5

UDAF 如何替换查询中的AVG

例如这不起作用

import numpy as np
def udf_avg(x):
    return np.mean(x)
sql.udf.register('udf_avg', udf_avg)
rv = sql.sql('SELECT id, udf_avg(value) FROM df GROUP BY id').toPandas()

这个想法是在纯 Python 中实现一个 UDAF,用于处理 SQL 聚合函数(例如低通滤波器)不支持的处理。

【问题讨论】:

标签: pandas apache-spark pyspark apache-spark-sql user-defined-functions


【解决方案1】:

您可以使用 GROUPED_AGG 类型的 Pandas UDF。它从 Spark 接收作为 Pandas Series 的列,以便您可以在该列上调用 Series.mean

import pyspark.sql.functions as F

@F.pandas_udf('float', F.PandasUDFType.GROUPED_AGG)  
def avg_udf(s):
    return s.mean()

df2 = df.groupBy('id').agg(avg_udf('value'))

df2.show()
+---+--------------+
| id|avg_udf(value)|
+---+--------------+
|  1|           1.5|
|  2|           3.5|
+---+--------------+

也可以注册它以在 SQL 中使用:

df.createTempView('df')
spark.udf.register('avg_udf', avg_udf)

df2 = spark.sql("select id, avg_udf(value) from df group by id")
df2.show()
+---+--------------+
| id|avg_udf(value)|
+---+--------------+
|  1|           1.5|
|  2|           3.5|
+---+--------------+

【讨论】:

  • 您提供的解决方案适用于3.0之前的Spark版本,见this link。 Pandas UDF 定义已从 Spark 3.0 更改为 Python 3.6+。这是触发的特定 UserWarning In Python 3.6+ and Spark 3.0+, it is preferred to specify type hints for pandas UDF instead of specifying pandas UDF type which will be deprecated in the future releases. See SPARK-28264 for more details
【解决方案2】:

可以使用 Pandas UDF,其定义与 Spark 3.0Python 3.6+ 兼容。详情请参阅issuedocumentation

Spark SQL 中的完整实现:​​

import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType

spark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame(
    pd.DataFrame({'id': [1, 1, 2, 2], 'value': [1, 2, 3, 4]}))
df.createTempView('df')

@pandas_udf(DoubleType())
def avg_udf(s: pd.Series) -> float:
    return s.mean()
spark.udf.register('avg_udf', avg_udf)

rv = spark.sql('SELECT id, avg_udf(value) FROM df GROUP BY id').toPandas()

有返回值

In [2]: rv
Out[2]:
   id  avg_udf(value)
0   1             1.5
1   2             3.5

【讨论】:

  • 我认为您的意思是FloatType,因为签名使用float,但除此之外,我的回答有了很好的改进:)
  • 如果您想避免弃用的功能,我建议您使用SparkSession 而不是(长)弃用的SQLContext
  • SparkSession 是更好的选择,感谢您指出这一点:) 关于FloatTypeDoubleType,两者都有效,但我认为后者是正确的实现,因为它是双精度的和float 一样。我们必须以不同的格式两次指定返回值类型,这似乎很不合情理。有谁明白这背后的原因吗?
  • 我不知道,但你可以使用字符串'double',这样可以节省导入和一些输入...
猜你喜欢
  • 1970-01-01
  • 2016-11-03
  • 1970-01-01
  • 2016-06-29
  • 2015-12-03
  • 2017-10-13
  • 2019-03-06
  • 2015-11-13
相关资源
最近更新 更多