【问题标题】:Spark UDF as function parameter, UDF is not in function scopeSpark UDF 作为函数参数,UDF 不在函数范围内
【发布时间】:2017-02-08 21:03:13
【问题描述】:

我想将一些 UDF 作为函数参数与数据帧一起传递。

执行此操作的一种方法可能是在函数中创建 UDF,但这会创建和销毁 UDF 的多个实例而不重用它,这可能不是解决此问题的最佳方法。

这是一段示例代码 -

val lkpUDF = udf{(i: Int) => if (i > 0) 1 else 0}

val df =   inputDF1
    .withColumn("new_col", lkpUDF(col("c1")))
val df2 =   inputDF2.
  .withColumn("new_col", lkpUDF(col("c1")))

理想情况下,我不想做上述事情,而是想做这样的事情 -

val lkpUDF = udf{(i: Int) => if (i > 0) 1 else 0}

def appendCols(df: DataFrame, lkpUDF: ?): DataFrame = {

    df
      .withColumn("new_col", lkpUDF(col("c1")))

  }
val df = appendCols(inputDF, lkpUDF)

上面的 UDF 非常简单,但在我的例子中,它可以返回原始类型或用户定义的案例类类型。任何想法/指针将不胜感激。谢谢。

【问题讨论】:

    标签: scala apache-spark apache-spark-sql spark-dataframe udf


    【解决方案1】:

    具有适当签名的函数必须是这样的:

    import org.apache.spark.sql.UserDefinedFunction
    
    def appendCols(df: DataFrame, func: UserDefinedFunction): DataFrame = {
        df.withColumn("new_col", func(col("col1")))
    }
    

    scala REPL 在返回初始化值的类型方面非常有帮助。

    scala> val lkpUDF = udf{(i: Int) => if (i > 0) 1 else 0}
    lkpUDF: org.apache.spark.sql.UserDefinedFunction = UserDefinedFunction(<function1>,IntegerType,List(IntegerType))
    

    此外,如果您传递到 udf 包装器的函数签名包含 Any 返回类型(如果函数可以返回原始类型或用户定义的案例类,则属于这种情况), UDF 将无法编译,并出现如下异常:

    java.lang.UnsupportedOperationException: Schema for type Any is not supported
    

    【讨论】:

    猜你喜欢
    • 2016-12-02
    • 1970-01-01
    • 2016-11-12
    • 1970-01-01
    • 1970-01-01
    • 2020-10-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多