【问题标题】:Spark UDF - Task not serializable exceptionSpark UDF - 任务不可序列化异常
【发布时间】:2019-02-05 22:30:02
【问题描述】:

我正在尝试使用以下 scala 代码创建 UDF

lazy val formattedDF = df.withColumn("result_col", validateudf(df("id")))

val validateudf = udf((id: Int) => {

  if(id == 1){
     "ID IS EQUAL TO 1"
  } 
  else if(id > 1){
    validateId(id)
  }
  else{
    "NO VALID RECORDS"
  }
})

def validateId(id:Int) : String = {
   if (id > 2) {
     "ID IS GREATER THAN 2"
   }
   else {
     "VALID RECORDS"
   }
 }

当我运行这段代码时,我得到了任务不可序列化的异常。

有什么想法吗?谢谢。

【问题讨论】:

  • 我们需要更多关于异常的细节 - stacktrace 等
  • 线程“主”org.apache.spark.SparkException 中的异常:在 org.apache 的 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) 中的任务不可序列化。 spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
  • 是完整的代码吗?似乎您的代码中有一些闭包。使用UDF时,要注意闭包。
  • 将您的 validateId 方法移到 validateudf 中,它应该会有所帮助(如果它是完整的代码)

标签: scala spark-dataframe


【解决方案1】:

udf 被视为一个黑盒,需要对传递的列进行序列化和反序列化,因此当您有内置函数的替代方案时,不建议使用udf

withColumn 调用udf 函数没问题,但是您从导致问题的udf 函数内部调用了另一个函数validateId

我建议你完全不要使用udf函数,因为你可以通过使用when内置函数来达到要求。

import org.apache.spark.sql.functions._
val formattedDF2 = df.withColumn("result_col", when($"id" === 1, lit("ID IS EQUAL TO 1")).otherwise(when($"id" > 2, lit("ID IS GREATER THAN 2")).otherwise(when($"id" > 1, lit("VALID RECORDS")).otherwise(lit("NO VALID RECORDS")))))

【讨论】:

  • 感谢您的回复,但我计划在 validateId 方法内的 id 列上实施一些其他验证。
  • @user7693124 您可以将它们添加到 when 列表中 :) 感谢您的支持和接受
猜你喜欢
  • 2016-07-10
  • 2020-02-04
  • 1970-01-01
  • 1970-01-01
  • 2018-04-06
  • 2021-03-16
  • 1970-01-01
  • 2016-08-16
  • 2015-12-16
相关资源
最近更新 更多