【发布时间】:2019-02-05 22:30:02
【问题描述】:
我正在尝试使用以下 scala 代码创建 UDF
lazy val formattedDF = df.withColumn("result_col", validateudf(df("id")))
val validateudf = udf((id: Int) => {
if(id == 1){
"ID IS EQUAL TO 1"
}
else if(id > 1){
validateId(id)
}
else{
"NO VALID RECORDS"
}
})
def validateId(id:Int) : String = {
if (id > 2) {
"ID IS GREATER THAN 2"
}
else {
"VALID RECORDS"
}
}
当我运行这段代码时,我得到了任务不可序列化的异常。
有什么想法吗?谢谢。
【问题讨论】:
-
我们需要更多关于异常的细节 - stacktrace 等
-
线程“主”org.apache.spark.SparkException 中的异常:在 org.apache 的 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) 中的任务不可序列化。 spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
-
是完整的代码吗?似乎您的代码中有一些闭包。使用UDF时,要注意闭包。
-
将您的 validateId 方法移到 validateudf 中,它应该会有所帮助(如果它是完整的代码)
标签: scala spark-dataframe