【问题标题】:Spark streaming Job aborted due to stage failure when reading from kafka topic从 kafka 主题读取时,由于阶段失败,Spark 流作业中止
【发布时间】:2017-11-10 00:15:12
【问题描述】:

我是 spark 和 kafka 的新手,我正在使用 spark 流处理来自 kafka 主题的数据。现在,我只想在控制台中打印记录。 我在两个节点(scala 版本 2.12.2 和 spark-2.1.1)和一个带有 kafka 的节点(版本 kafka_2.11-0.10.2.0)上有一个带有 spark 的迷你集群。 但是,当我提交代码时,出现此错误:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 1.3.64.64, executor 1): java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class
    at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.<init>(KafkaRDD.scala:193)
    at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:185)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

这与版本有关吗?或者也许我的代码不正确!

这是我的代码:

import java.util.UUID
import org.apache.kafka.clients.consumer.ConsumerRecord
import runtime.ScalaRunTime.stringOf
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe


object followProduction {

def main(args: Array[String]) = {

val sparkConf = new SparkConf().setMaster("spark://<real adress here : 10. ...>:7077").setAppName("followProcess")
val streamContext = new StreamingContext(sparkConf, Seconds(2))

streamContext.checkpoint("checkpoint")

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "1.3.64.66:9094",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> s"${UUID.randomUUID().toString}",
  "auto.offset.reset" -> "earliest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("test")
val stream = KafkaUtils.createDirectStream[String, String](
  streamContext,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

stream.print()

//stream.map(record => (record.key, record.value)).count().print()

streamContext.start()
streamContext.awaitTermination()
}
}

这是我构建的:

name := "test"
version := "1.0"
scalaVersion := "2.12.2"

libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "2.1.1" %"provided"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "2.1.1" %"provided"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.10" % "2.0.0"

assemblyMergeStrategy in assembly := {
  case PathList("META-INF", xs @ _*) => MergeStrategy.discard
  case x => MergeStrategy.first
}

我们将不胜感激,感谢您的宝贵时间。

【问题讨论】:

    标签: scala apache-spark streaming apache-kafka spark-streaming


    【解决方案1】:

    Spark 2.1.x 是针对 Scala 2.11 而不是 2.12 编译的。

    试试:

    scalaVersion := 2.11.11
    

    任何 2.11.x 版本都应该可以工作。

    此外,当您需要 2.11 时,您的 Kafka 流式依赖项指的是 Scala 2.10:

    libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.1.1"
    

    【讨论】:

    • 感谢您的回复,但我在更改 scala 版本后仍然遇到同样的错误
    • @Asmaas 您是否在集群节点中安装了 Scala 2.11?
    • 是的,我做到了。当我运行'scala -version'时我得到了这个:Scala code runner version 2.11.11 -- Copyright 2002-2017, LAMP/EPFL
    • @Asmaam 查看我的更新,您还有另一个版本不匹配。
    • 确实是scala版本的问题。我使用了 Scala 2.10.6 并且成功了!非常感谢您的见解
    【解决方案2】:

    除了您的版本不匹配之外,我认为您正在运行 Spark 集群,您需要将所有 JARS(库)从您运行 Spark 驱动程序的实际应用程序提交到 Spark 从机(节点)。

    您可以使用.setJars(libs) 方法提交jarsSparkConf

    类似的东西

    lazy val conf: SparkConf = new SparkConf()
        .setMaster(sparkMaster)
        .setAppName(sparkAppName)
        .set("spark.app.id", sparkAppId)
        .set("spark.submit.deployMode", "cluster")
        .setJars(libs) //setting jars for sparkContext
    

    注意: libs: Seq[String] 即库路径序列

    【讨论】:

    • 感谢您的回复,我会调查一下
    猜你喜欢
    • 1970-01-01
    • 2019-11-23
    • 1970-01-01
    • 1970-01-01
    • 2016-10-27
    • 2017-11-24
    • 2019-09-24
    • 1970-01-01
    • 2014-09-10
    相关资源
    最近更新 更多