【问题标题】:Spark classnotfoundexception in UDFUDF 中的 Spark 类未发现异常
【发布时间】:2017-08-25 08:26:34
【问题描述】:

当我调用一个函数时,它会起作用。但是当我在 UDF 中调用该函数时将不起作用。

这是完整的代码。

val sparkConf = new SparkConf().setAppName("HiveFromSpark").set("spark.driver.allowMultipleContexts","true")
val sc = new SparkContext(sparkConf)
val hive = new org.apache.spark.sql.hive.HiveContext(sc)

///////////// UDFS
def toDoubleArrayFun(vec:Any) : scala.Array[Double] = {
  return vec.asInstanceOf[WrappedArray[Double]].toArray
}
def toDoubleArray=udf((vec:Any) => toDoubleArrayFun(vec))

//////////// PROCESS
var df = hive.sql("select vec from mst_wordvector_tapi_128dim where word='soccer'")
println("==== test get value then transform")
println(df.head().get(0))
println(toDoubleArrayFun(df.head().get(0)))

println("==== test transform by udf")
df.withColumn("word_v", toDoubleArray(col("vec")))
.show(10);

然后这是输出。

sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@6e9484ad
hive: org.apache.spark.sql.hive.HiveContext = 
toDoubleArrayFun: (vec: Any)Array[Double]
toDoubleArray: org.apache.spark.sql.UserDefinedFunction
df: org.apache.spark.sql.DataFrame = [vec: array<double>]
==== test get value then transform
WrappedArray(-0.88675,, 0.0216657)
[D@4afcc447
==== test transform by udf
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 5, xdad008.band.nhnsystem.com): java.lang.ClassNotFoundException: $iwC$$iwC$$iwC$$iwC$$iwC$$$$5ba2a895f25683dd48fe725fd825a71$$$$$$iwC$$anonfun$toDoubleArray$1
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

这里的完整输出。 https://gist.github.com/jeesim2/efb52f12d6cd4c1b255fd0c917411370

如您所见,“toDoubleArrayFun”函数运行良好,但在 udf 中它声称 ClassNotFoundException。

我不能改变hive的数据结构,需要将vec转换为Array[Double]来做一个Vector实例。

那么上面的代码有什么问题呢?

Spark 版本是 1.6.1

更新 1

Hive 表的 'vec' 列类型是“array&lt;double&gt;

下面的代码也会出错

var df = hive.sql("select vec from mst_wordvector_tapi_128dim where 
word='hh'")
df.printSchema()
var word_vec = df.head().get(0)
println(word_vec)
println(Vectors.dense(word_vec))

输出

df: org.apache.spark.sql.DataFrame = [vec: array<double>]
root
|-- vec: array (nullable = true)
|    |-- element: double (containsNull = true)
==== test get value then transform
word_vec: Any = WrappedArray(-0.88675,...7)
<console>:288: error: overloaded method value dense with alternatives:
(values: Array[Double])org.apache.spark.mllib.linalg.Vector <and>
(firstValue: Double,otherValues:Double*)org.apache.spark.mllib.linalg.Vector
cannot be applied to (Any)
println(Vectors.dense(word_vec))

这意味着不能将 hive 'array&lt;double&gt;' 列强制转换为 Array&lt;Double&gt; 其实我想计算距离:用两个array&lt;double&gt; 列加倍。 如何根据array&lt;double&gt;列添加Vector列?

典型的方法是

Vectors.sqrt(Vectors.dense(Array<Double>, Array<Double>)

【问题讨论】:

    标签: apache-spark apache-spark-sql user-defined-functions


    【解决方案1】:

    由于udf函数必须进行序列化和反序列化过程,any DataType 将不起作用。您必须定义要传递给udf 函数的列的确切数据类型。

    从您问题的输出来看,您的数据框中似乎只有一列,即vec,它属于Array[Double] 类型

    df: org.apache.spark.sql.DataFrame = [vec: array<double>]
    

    实际上不需要该 udf 函数,因为您的 vec 列已经是 Array 数据类型,这也是您的 udf 函数正在做的事情,即将值转换为 Array[Double]

    现在,您的其他函数调用正在运行

    println(toDoubleArrayFun(df.head().get(0)))
    

    因为不需要序列化和反序列化过程,它只是scala函数调用。

    【讨论】:

    • 啊,可序列化才是重点!但是我如何制作数据框列Array&lt;Double&gt;,而不是任何?我已经更新了问题!
    • 而不是 Any 在您的 udf 函数中,只需将数据类型定义为 WrappedArray[Double] 就可以了。 :)
    • 感谢您的友好回答。顺便说一句,当我将参数类型设置为 WrappedArray[Double] 而不是 Any 它失败了。 &lt;console&gt;:346: error: type mismatch; found : Any. required: scala.collection.mutable.WrappedArray[Double] println( Vectors.dense(toDoubleArrayFun(df.head().get(0))))
    • 我将不得不一步一步地看到你在做什么。如果您再次更新问题,那么问题将变得一团糟。所以我建议你问另一个问题,一步一步地处理你的过程和当前的问题。如果其他人不回答,我会尝试回答。如果这个答案对您有帮助,请接受并投票。 :) 谢谢
    猜你喜欢
    • 2021-12-07
    • 2020-11-14
    • 2020-11-13
    • 2010-12-11
    • 2016-03-06
    • 2014-04-22
    • 2015-09-03
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多