【问题标题】:Spark sqlContext UDF acting on SetsSpark sqlContext UDF 作用于 Set
【发布时间】:2016-09-01 21:16:34
【问题描述】:

我一直在尝试定义一个在 Spark 的 DataFrame 中工作的函数,该函数将 scala 集作为输入并输出一个整数。我收到以下错误:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 20 in stage 25.0 failed 1 times, most recent failure: Lost task 20.0 in stage 25.0 (TID 473, localhost): java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to scala.collection.immutable.Set

这是一个简单的代码,给出了问题的症结:

// generate sample data
case class Dummy( x:Array[Integer] )
val df = sqlContext.createDataFrame(Seq(
  Dummy(Array(1,2,3)),
  Dummy(Array(10,20,30,40))
))

// define the UDF
import org.apache.spark.sql.functions._
def setSize(A:Set[Integer]):Integer = {
  A.size
}
// For some reason I couldn't get it to work without this valued function
val sizeWrap: (Set[Integer] => Integer) = setSize(_)
val sizeUDF = udf(sizeWrap)

// this produces the error
df.withColumn("colSize", sizeUDF('x)).show

我在这里缺少什么?我怎样才能让它工作?我知道我可以通过强制转换为 RDD 来做到这一点,但我不想在 RDD 和 DataFrames 之间来回切换。

【问题讨论】:

    标签: scala apache-spark apache-spark-sql spark-dataframe


    【解决方案1】:

    使用Seq:

    val sizeUDF = udf((x: Seq) =>  setSize(x.toSet))
    

    【讨论】:

    • 谢谢。有用。我还设法概括为 2 组作为输入(这是我最初的需要)
    猜你喜欢
    • 2015-05-26
    • 2016-11-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多