【问题标题】:java.lang.ClassNotFoundException occurs when I use map function of DataFrame on Spark当我在 Spark 上使用 DataFrame 的 map 函数时发生 java.lang.ClassNotFoundException
【发布时间】:2018-04-24 15:50:45
【问题描述】:

我有一个 DataFrame "orderedDf" ,其架构如下:

root
|-- schoolID: string (nullable = true)
|-- count(studentID): long (nullable = false)
|-- count(teacherID): long (nullable = false)
|-- sum(size): long (nullable = true)
|-- sum(documentCount): long (nullable = true)
|-- avg_totalScore: double (nullable = true)

这是我的 DataFrame "orderedDf" 的数据:

+--------+----------------+----------------+---------+------------------+--------------+
|schoolID|count(studentID)|count(teacherID)|sum(size)|sum(documentCount)|avg_totalScore|
+--------+----------------+----------------+---------+------------------+--------------+
|school03|               2|               2|      195|               314|         100.0|
|school02|               2|               2|      193|               330|          94.5|
|school01|               2|               2|      294|               285|          83.4|
|school04|               2|               2|      263|               415|          72.5|
|school05|               2|               2|      263|               415|          62.5|
|school07|               2|               2|      263|               415|          52.5|
|school09|               2|               2|      263|               415|          49.8|
|school08|               2|               2|      263|               415|          42.3|
|school06|               2|               2|      263|               415|          32.5|
+--------+----------------+----------------+---------+------------------+--------------+

我们可以看到“avg_totalScore”列是按 desc 排序的。 现在,我有一个问题,我想将所有行分成三个组,如下所示:

+--------+----------------+----------------+---------+------------------+--------------+
|schoolID|count(studentID)|count(teacherID)|sum(size)|sum(documentCount)|avg_totalScore|
+--------+----------------+----------------+---------+------------------+--------------+
|great   |               2|               2|      195|               314|         100.0|
|great   |               2|               2|      193|               330|          94.5|
|great   |               2|               2|      294|               285|          83.4|
|good    |               2|               2|      263|               415|          72.5|
|good    |               2|               2|      263|               415|          62.5|
|good    |               2|               2|      263|               415|          52.5|
|bad     |               2|               2|      263|               415|          49.8|
|bad     |               2|               2|      263|               415|          42.3|
|bad     |               2|               2|      263|               415|          32.5|
+--------+----------------+----------------+---------+------------------+--------------+

也就是说,我想根据他们的“avg_totalScore”把学校分成三组,分别是好学校、好学校和坏学校,比例是3:3:3。

我的解决方案如下:

val num = orderedDf.count()
val first_split_num = math.floor(num * (1.0/3))
val second_split_num = math.ceil(num * (2.0/3))
val accumu = SparkContext.getOrCreate(Configuration.getSparkConf).accumulator(0, "Group Num")
val rdd = orderedDf.map(row => {
  val group = {
    accumu match {
      case a: Accumulator[Int] if a.value <= first_split_num => "great"
      case b: Accumulator[Int] if b.value <= second_split_num => "good"
      case _ => "bad"
    }
  }
  accumu += 1
  Row(group, row(1), row(2), row(3), row(4), row(5), row(6))
})

val result = sqlContext.createDataFrame(rdd,orderedDf.schema)

上面的代码是可以的,没有任何异常,但是当我使用时:

result.collect().foreach(println)

result.show()

我得到一个 ClassNotFound 异常,我不知道原因。谁能帮帮我,非常感谢!

这里是异常的详细信息:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 44.0 failed 4 times, most recent failure: Lost task 0.3 in stage 44.0 (TID 3644, node1): java.lang.ClassNotFoundException: com.lancoo.ecbdc.business.ComparativeAnalysisBusiness$$anonfun$1
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

【问题讨论】:

  • 具体的class-not-found是com.lancoo.ecbdc.business.ComparativeAnalysisBusiness$$anonfun$1,鉴于你没有展示orderedDf的实现,问题出现的可能性很大。
  • 非常感谢,我猜问题出在map函数上,因为map函数是一个transform operator,一个lazy operator,所以在一些action operator执行之前是可以的。但是不知道map函数哪里出错了。

标签: scala apache-spark spark-dataframe


【解决方案1】:

作为 Spark 新手,我刚刚遇到了这个问题 - 看起来您实际上还没有将包含您的类的 jar 提交到执行程序节点,所以当您尝试执行操作时在数据帧(分布式)上,执行器无法运行代码,因为找不到类。

【讨论】:

    【解决方案2】:
    java.lang.ClassNotFoundException: com.lancoo.ecbdc.business.ComparativeAnalysisBusiness$$anonfun$1
    

    类加载器无法根据异常加载上述类。您能否提供更多信息如何在您的代码中使用此类?

    【讨论】:

    • 非常感谢,上面贴的代码在这个类中,所以我猜问题不是ClassNotFound,而是这个类中的匿名函数。
    猜你喜欢
    • 2019-08-09
    • 2023-04-03
    • 2021-12-04
    • 1970-01-01
    • 2015-11-20
    • 2017-01-27
    • 2020-09-11
    • 2020-08-10
    • 1970-01-01
    相关资源
    最近更新 更多