【发布时间】:2015-08-06 19:29:35
【问题描述】:
我在使用 updateStateByKey() 函数时遇到问题。我有以下简单的代码(基于书籍编写:“Learning Spark - Lighting Fast Data Analysis”):
object hello {
def updateStateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
Some(runningCount.getOrElse(0) + newValues.size)
}
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[5]").setAppName("AndrzejApp")
val ssc = new StreamingContext(conf, Seconds(4))
ssc.checkpoint("/")
val lines7 = ssc.socketTextStream("localhost", 9997)
val keyValueLine7 = lines7.map(line => (line.split(" ")(0), line.split(" ")(1).toInt))
val statefullStream = keyValueLine7.updateStateByKey(updateStateFunction _)
ssc.start()
ssc.awaitTermination()
}
}
我的 build.sbt 是:
name := "stream-correlator-spark"
version := "1.0"
scalaVersion := "2.11.4"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.3.1" % "provided",
"org.apache.spark" %% "spark-streaming" % "1.3.1" % "provided"
)
当我使用sbt assembly 命令构建它时,一切正常。当我以本地模式在 spark 集群上运行它时,出现错误:
线程“main”中的异常 java.lang.NoClassDefFoundError: org/apache/spark/streaming/dstream/DStream$ 在你好$.main(helo.scala:25) ...
第 25 行是:
val statefullStream = keyValueLine7.updateStateByKey(updateStateFunction _)
我觉得这可能是一些兼容性版本问题,但我不知道可能是什么原因以及如何解决这个问题。
非常感谢您的帮助!
【问题讨论】:
-
我检查了使用
sbt assembly创建的 jar 文件,它不包含任何与 spark 或 spark 流相关的 jar(或类)。sbt assembly不应该包含在任何 JVM 上运行所需的所有源代码吗?
标签: scala apache-spark sbt spark-streaming sbt-assembly