【问题标题】:Spark Streaming/Kafka classes not found未找到 Spark Streaming/Kafka 类
【发布时间】:2017-04-28 05:11:59
【问题描述】:

我正在尝试创建一个 Spark StreamingContext 来流式传输来自 Kafka 主题的消息。所以我在构建中添加了以下依赖项:

"org.apache.spark:spark-streaming-kafka_2.10:1.6.2"

然后我创建了以下类:

import org.apache.spark.SparkContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream

import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka.KafkaUtils

object StreamingApp {
    def main(args: Array[String]): Unit = {
        def messageConsumer(): StreamingContext = {
            val topicName : String = "my-topic"
            val brokerHostAndPort : String = "mykafka.example.com:9092"

            val ssc = new StreamingContext(SparkContext.getOrCreate(), Seconds(10))

            createKafkaStream(ssc, topicName, brokerHostAndPort).foreachRDD(rdd => {
                rdd.foreach { msg =>
                    // TODO: Implement message processing here.
                }
            })

            ssc
        }

        StreamingContext.getActive.foreach {
            _.stop(stopSparkContext = false)
        }

        val ssc = StreamingContext.getActiveOrCreate(messageConsumer)
        ssc.start()
        ssc.awaitTermination()
    }

    def createKafkaStream(ssc: StreamingContext,
            kafkaTopics: String, brokers: String): DStream[(String, 
            String)] = {
        val kafkaParams = Map[String, String](
            "bootstrap.servers" -> brokers,
            "key.deserializer" -> "StringDeserializer",
            "value.deserializer" -> "StringDeserializer",
            "auto.offset.reset" -> "latest",
            "enable.auto.commit" -> "false"
        )        

        KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](
            ssc, kafkaParams, Set(kafkaTopics))
    }
}

当我编译这个(通过 Ant,但这不重要)我得到 scalac 编译器错误:

[scalac] /Users/myuser/workspace/myapp/src/main/groovy/com/me/myapp/utils/scala/StreamingApp.scala:11: error: not found: object kafka
[scalac] import kafka.serializer.StringDecoder
[scalac]        ^
[scalac] /Users/myuser/workspace/myapp/src/main/groovy/com/me/myapp/utils/scala/StreamingApp.scala:12: error: object kafka is not a member of package org.apache.spark.streaming
[scalac] import org.apache.spark.streaming.kafka.KafkaUtils
[scalac]                                   ^
[scalac] /Users/myuser/workspace/myapp/src/main/groovy/com/me/myapp/utils/scala/StreamingApp.scala:63: error: not found: value KafkaUtils
[scalac]         KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaParams, Set(kafkaTopics))
[scalac]         ^
[scalac] three errors found

我在这里缺少任何依赖项吗?或者没有使用正确的依赖项?还是编码有误?


更新:

有趣的是,当我将依赖项更改为:

"org.apache.spark:spark-streaming-kafka_2.10:1.6.1"

这些编译器错误消失了...

【问题讨论】:

    标签: scala apache-spark apache-kafka spark-streaming


    【解决方案1】:

    kafka 依赖项的工件 ID 应如下所示:

    spark-streaming-kafka-0-8_2.11
    

    希望它对你有用。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-01-29
      • 2019-08-08
      • 2016-03-12
      • 2018-05-17
      • 1970-01-01
      • 1970-01-01
      • 2017-12-28
      • 1970-01-01
      相关资源
      最近更新 更多