【发布时间】:2021-12-20 11:06:39
【问题描述】:
运行此代码时出现不可序列化错误:
import org.apache.spark.{SparkContext, SparkConf}
import scala.collection.mutable.ArrayBuffer
object Task1 {
def findHighestRatingUsers(movieRating: String): (String) = {
val tokens = movieRating.split(",", -1)
val movieTitle = tokens(0)
val ratings = tokens.slice(1, tokens.size)
val maxRating = ratings.max
var userIds = ArrayBuffer[Int]()
for(i <- 0 until ratings.length){
if (ratings(i) == maxRating) {
userIds += (i+1)
}
}
return movieTitle + "," + userIds.mkString(",")
return movieTitle
}
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Task 1")
val sc = new SparkContext(conf)
val Lines = sc.textFile(args(0))
val TitleAndMaxUserIds = Lines.map(findHighestRatingUsers)
.saveAsTextFile(args(1))
}
}
错误发生在以下行:
val TitleAndMaxUserIds =Lines.map(findHighestRatingUsers)
.saveAsTextFile(args(1))
我相信这是由于函数“findHighestRatingUsers”中的某些原因造成的。有人可以解释为什么以及如何解决它吗?
异常中的更多信息如下:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2362)
at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:396)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
at org.apache.spark.rdd.RDD.map(RDD.scala:395)
at Task1$.main(Task1.scala:63)
at Task1.main(Task1.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: Task1$
Serialization stack:
- object not serializable (class: Task1$, value: Task1$@3c770db4)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class Task1$, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic Task1$.$anonfun$main$1:(LTask1$;Ljava/lang/String;)Ljava/lang/String;, instantiatedMethodType=(Ljava/lang/String;)Ljava/lang/String;, numCaptured=1])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class Task1$$$Lambda$1023/20408451, Task1$$$Lambda$1023/20408451@4f59a516)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:413)
... 22 more
我查看了这篇文章 Difference between object and class in Scala 并尝试使用对象来封装函数:
import org.apache.spark.{SparkContext, SparkConf}
import scala.collection.mutable.ArrayBuffer
object Function{
def _findHighestRatingUsers(movieRating: String): (String) = {
val tokens = movieRating.split(",", -1)
val movieTitle = tokens(0)
val ratings = tokens.slice(1, tokens.size)
val maxRating = ratings.max
var userIds = ArrayBuffer[Int]()
for(i <- 0 until ratings.length){
if (ratings(i) == maxRating) {
userIds += (i+1)
}
}
return movieTitle + "," + userIds.mkString(",")
}
}
object Task1 {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Task 1")
val sc = new SparkContext(conf)
val textFile = sc.textFile(args(0))
val output = textFile.map(Function._findHighestRatingUsers)
.saveAsTextFile(args(1))
}
}
但仍然出现异常,出现大量错误...
这次我尝试将对象函数放在对象task1中,如下所示:
import org.apache.spark.{SparkContext, SparkConf}
import scala.collection.mutable.ArrayBuffer
object Task1 {
object Function{
def _findHighestRatingUsers(movieRating: String): (String) = {
val tokens = movieRating.split(",", -1)
val movieTitle = tokens(0)
val ratings = tokens.slice(1, tokens.size)
val maxRating = ratings.max
var userIds = ArrayBuffer[Int]()
for(i <- 0 until ratings.length){
if (ratings(i) == maxRating) {
userIds += (i+1)
}
}
return movieTitle + "," + userIds.mkString(",")
}
}
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Task 1")
val sc = new SparkContext(conf)
val textFile = sc.textFile(args(0))
val output = textFile.map(Function._findHighestRatingUsers)
.saveAsTextFile(args(1))
}
}
问题解决了。但是我还是不知道为什么嵌套对象解决了这个问题。有人可以解释一下吗? 此外,我有几点不确定:
- scala 中的主要功能是什么?是节目的入口吗?
- 为什么要用对象来描述主函数?
- 谁能给出一个包含函数、类或一些基本组件的 Scala 程序的通用结构?
【问题讨论】:
-
你能发布完整的堆栈跟踪吗?您是否尝试将方法
findHighestRatingUsers移动到与包含 main 方法的对象不同的对象中? -
您使用的是什么版本的 Java?
-
我已经在@GaëlJ 上面发布了错误。我不知道这些错误的含义。
-
它的 java8 @tjheslin1
-
什么异常。你在第二个代码示例中得到了堆栈跟踪吗?
标签: scala apache-spark