【问题标题】:ExceptionInInitializerError Spark Streaming KafkaExceptionInInitializerError Spark Streaming Kafka
【发布时间】:2018-12-01 16:37:37
【问题描述】:

我正在尝试在一个简单的应用程序中将 Spark Streaming 连接到 Kafka。我通过 Spark 文档中的示例创建了这个应用程序。当我尝试运行它时,我得到了这样的异常:

Exception in thread "main" java.lang.ExceptionInInitializerError
    at org.apache.spark.streaming.dstream.InputDStream.<init>(InputDStream.scala:80)
    at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.<init>(DirectKafkaInputDStream.scala:59)
    at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:147)
    at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:124)
    at producer.KafkaProducer$.main(KafkaProducer.scala:36)
    at producer.KafkaProducer.main(KafkaProducer.scala)
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Incompatible Jackson version: 2.9.4
    at com.fasterxml.jackson.module.scala.JacksonModule$class.setupModule(JacksonModule.scala:64)
    at com.fasterxml.jackson.module.scala.DefaultScalaModule.setupModule(DefaultScalaModule.scala:19)
    at com.fasterxml.jackson.databind.ObjectMapper.registerModule(ObjectMapper.java:751)
    at org.apache.spark.rdd.RDDOperationScope$.<init>(RDDOperationScope.scala:82)

这是我的代码:

object KafkaProducer {

  def main(args: Array[String]) {

    val spark = SparkSession
      .builder()
      .appName("KafkaSparkStreaming")
      .master("local[*]")
      .getOrCreate()

    val ssc = new StreamingContext(spark.sparkContext, Seconds(3))
    val topics = Array("topic1", "topic2")

    def kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "1",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val lines = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
    )
    lines.map(_.key())

    ssc.start()
    ssc.awaitTermination()

我不确定问题出在配置还是代码本身。这就是我的 build.sbt 文件的样子:

scalaVersion := "2.11.4"

resolvers += "Spark Packages Repo" at "http://dl.bintray.com/spark-packages/maven"

libraryDependencies ++= Seq(
  "org.apache.kafka" %% "kafka" % "1.1.0",
  "org.apache.spark" %% "spark-core" % "2.3.0",
  "org.apache.spark" %% "spark-sql" % "2.3.0",
  "org.apache.spark" %% "spark-streaming" % "2.3.0",
  "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.3.0"
)

如果有任何帮助,我将不胜感激,因为我不知道出了什么问题!

【问题讨论】:

    标签: scala apache-spark apache-kafka spark-streaming-kafka


    【解决方案1】:

    通过跟踪您遇到的异常的堆栈跟踪,我们可以发现主要问题是:

    原因:com.fasterxml.jackson.databind.JsonMappingException:杰克逊版本不兼容:2.9.4

    事实上

    Spark 2.1.0 包含 com.fasterxml.jackson.core 作为传递依赖。所以,我们不需要在 libraryDependencies 中包含 then。

    对于类似问题及其解决方案,here 进行了更详细的描述。

    【讨论】:

      猜你喜欢
      • 2019-08-08
      • 2016-03-12
      • 2018-05-17
      • 1970-01-01
      • 2017-12-28
      • 2020-10-29
      • 2019-01-15
      • 2015-10-15
      • 2021-02-28
      相关资源
      最近更新 更多