【问题标题】:How do I pass multiple arguments to a Pandas UDF in PySpark?如何在 PySpark 中将多个参数传递给 Pandas UDF?
【发布时间】:2021-08-26 05:54:42
【问题描述】:

我正在使用以下 sn-p:

from cape_privacy.pandas.transformations import Tokenizer

max_token_len = 5


@pandas_udf("string")

def Tokenize(column: pd.Series)-> pd.Series:
  tokenizer = Tokenizer(max_token_len)
  return tokenizer(column)


spark_df = spark_df.withColumn("name", Tokenize("name"))

由于 Pandas UDF 仅使用 Pandas 系列,我无法在函数调用 Tokenize("name") 中传递 max_token_len 参数。

因此我必须在函数范围之外定义max_token_len 参数。

this question 中提供的解决方法并不是很有帮助。 此问题是否有其他可能的解决方法或替代方法?

请指教

【问题讨论】:

    标签: python pandas pyspark user-defined-functions


    【解决方案1】:

    在尝试了无数方法后,我找到了一个不费吹灰之力的解决方案,如下图所示:

    我创建了一个 包装器 函数 (Tokenize_wrapper) 来包装 Pandas UDF (Tokenize_udf) 并使用返回 Pandas UDF 的包装器函数 em> 函数调用。

    def Tokenize_wrapper(column, max_token_len=10):
    
      @pandas_udf("string")
      def Tokenize_udf(column: pd.Series) -> pd.Series:
        tokenizer = Tokenizer(max_token_len)
        return tokenizer(column)
    
      return Tokenize_udf(column)
    
    
    
    df = df.withColumn("Name", Tokenize_wrapper("Name", max_token_len=5))
    

    使用部分函数(@Vaebhav 的回答)确实使这个问题的实现变得困难。

    【讨论】:

      【解决方案2】:

      您可以通过使用partial 并在您的UDF 签名中直接指定一个额外的argument(s) 来实现此目的

      数据准备

      input_list = [
                     (1,None,111)    
                     ,(1,None,120)
                    ,(1,None,121)
                    ,(1,None,124)
                    ,(1,'p1',125)
                    ,(1,None,126)
                    ,(1,None,146)
                    ,(1,None,147)
                   ]
      
      sparkDF = sql.createDataFrame(input_list,['id','p_id','timestamp'])
      
      sparkDF.show()
      
      +---+----+---------+
      | id|p_id|timestamp|
      +---+----+---------+
      |  1|null|      111|
      |  1|null|      120|
      |  1|null|      121|
      |  1|null|      124|
      |  1|  p1|      125|
      |  1|null|      126|
      |  1|null|      146|
      |  1|null|      147|
      +---+----+---------+
      
      

      部分

      
      def add_constant(inp,cnst=5):
          return inp + cnst
      
      
      cnst_add = 10
      
      partial_func = partial(add_constant,cnst=cnst_add)
      
      sparkDF = sparkDF.withColumn('Constant',partial_func(F.col('timestamp')))
                       
      sparkDF.show()
      
      +---+----+---------+----------------+
      | id|p_id|timestamp|Constant_Partial|
      +---+----+---------+----------------+
      |  1|null|      111|             121|
      |  1|null|      120|             130|
      |  1|null|      121|             131|
      |  1|null|      124|             134|
      |  1|  p1|      125|             135|
      |  1|null|      126|             136|
      |  1|null|      146|             156|
      |  1|null|      147|             157|
      +---+----+---------+----------------+
      
      

      UDF 签名

      cnst_add = 10
      
      add_constant_udf = F.udf(lambda x : add_constant(x,cnst_add),IntegerType())
      
      
      sparkDF = sparkDF.withColumn('Constant_UDF',add_constant_udf(F.col('timestamp')))
      
      sparkDF.show()
      
      +---+----+---------+------------+
      | id|p_id|timestamp|Constant_UDF|
      +---+----+---------+------------+
      |  1|null|      111|         121|
      |  1|null|      120|         130|
      |  1|null|      121|         131|
      |  1|null|      124|         134|
      |  1|  p1|      125|         135|
      |  1|null|      126|         136|
      |  1|null|      146|         156|
      |  1|null|      147|         157|
      +---+----+---------+------------+
      
      

      同样,您可以如下转换您的功能 -

      from functools import partial
      
      max_token_len = 5
      
      def Tokenize(column: pd.Series,max_token_len=10)-> pd.Series:
        tokenizer = Tokenizer(max_token_len)
        return tokenizer(column)
      
      Tokenize_udf = F.udf(lambda x : Tokenize(x,max_token_len),StringType())
      
      Tokenize_partial = partial(Tokenize,max_token_len=max_token_len)
      
      spark_df = spark_df.withColumn("name", Tokenize_udf("name"))
      spark_df = spark_df.withColumn("name", Tokenize_partial("name"))
      
      

      【讨论】:

      • 我看到您提供的答案是关于UDF。答案是否也适用于Pandas UDF
      • F.udf 中的F 是什么?
      • NameError: name 'add_constant' is not defined
      • 更新了答案,添加了缺少的函数def
      • import pyspark.sql.functions as F
      猜你喜欢
      • 2020-03-31
      • 1970-01-01
      • 1970-01-01
      • 2018-06-03
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-04-02
      • 2017-07-21
      相关资源
      最近更新 更多