【问题标题】:How to add a udf to sqlContext in pyspark如何在pyspark中将udf添加到sqlContext
【发布时间】:2018-04-14 01:24:16
【问题描述】:

我知道我可以将 Python 函数注册为 UDF 并在 SQL 查询中使用它:

def example(s):
    return len(s)
sqlContext.udf.register("example_udf", example)
spark.sql("SELECT example_udf(col) FROM data")

或者我可以用 udf 包装 Python 函数,这样它就可以应用于数据帧:

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
example_udf = udf(example)
data.select(example_udf('col'))

在我的例子中,因为我需要将一些其他参数传递给 UDF,所以我为 UDF 构建了一个嵌套函数:

from pyspark.sql.types import BooleanType
from pyspark.sql.functions import col
def my_udf(other_par)
    def example(s):
        return len(s) == other_par
    return udf(example, BooleanType())

dataframe.select(...).where(my_udf(5)(col('col')))

现在我已经有了一个 UDF,我可以将它应用到数据帧上。但我也想在 spark.sql 中使用它,就像第一个块中的 SQL 查询一样,而不是数据框的 select 或 where 方法。所以我想知道我该怎么做。看起来sqlContext.udf.register 只能接受 Python 函数而不是 UDF。

【问题讨论】:

    标签: python apache-spark user-defined-functions


    【解决方案1】:

    如果您使用最新最好的 (2.3),请不要直接使用 udf

    def my_udf(other_par, spark):
        def _(s):
            return len(s) == other_par
        return spark.udf.register("my_udf_{}".format(other_par), _, BooleanType())
    
    my_udf_42 = my_udf(42, spark)
    
    spark.sql("SELECT my_udf_42(array(1, 2))").show()
    # +----------------------+
    # |my_udf_42(array(1, 2))|
    # +----------------------+
    # |                 false|
    # +----------------------+
    
    spark.createDataFrame([([1] * 42, )], ("id", )).select(my_udf_42("id")).show()
    # +-------------+
    # |my_udf_42(id)|
    # +-------------+
    # |         true|
    # +-------------+
    

    否则直接调用注册副作用:

    def my_udf(other_par, spark):
        def _(s):
            return len(s) == other_par
        name = "my_udf_{}".format(other_par)
        spark.udf.register(name, _, BooleanType())
        return udf(_, BooleanType())
    
    my_udf_0 = my_udf(0, spark)
    
    spark.sql("SELECT my_udf_0(array())").show()
    # +-----------------+
    # |my_udf_0(array())|
    # +-----------------+
    # |             true|
    # +-----------------+
    

    当然,像udf 这样的简单操作不应该做,但我认为这只是一个玩具示例。如果没有,

    from pyspark.sql.functions import size, length
    
    size("some_col") == 42
    length("some_col") == 42
    

    是更好的选择。

    【讨论】:

      猜你喜欢
      • 2016-07-28
      • 1970-01-01
      • 1970-01-01
      • 2015-09-25
      • 2022-11-15
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-05-21
      相关资源
      最近更新 更多