【问题标题】:Cannot compile Kafka consumer in Scala无法在 Scala 中编译 Kafka 消费者
【发布时间】:2016-12-28 19:37:06
【问题描述】:

我想使用 Scala 2.10.6 和 Spark 1.6.2 使用来自 Kafka 主题的消息。对于 Kafka,我正在使用此依赖项:

<dependency>
     <groupId>org.apache.spark</groupId>
     <artifactId>spark-streaming_2.10</artifactId>
     <version>1.6.2</version>
</dependency>

这段代码编译得很好,但是我想定义auto.offset.reset,问题就出现了:

val topicMap = topic.split(",").map((_, kafkaNumThreads.toInt)).toMap
val data = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap,
                             StorageLevel.MEMORY_AND_DISK_SER_2).map(_._2)

当我添加kafkaParams时,它不再编译:

val kafkaParams = Map[String, String](
"zookeeper.connect" -> zkQuorum, "group.id" -> group,
"zookeeper.connection.timeout.ms" -> "10000",
"auto.offset.reset" -> "smallest")

val data = KafkaUtils.createStream(ssc, kafkaParams, topicMap,
                                 StorageLevel.MEMORY_AND_DISK_SER_2).map(_._2)

错误信息:

94: error: missing parameter type for expanded function ((x$3) => x$3._2)
[ERROR]                                                 StorageLevel.MEMORY_AND_DISK_SER_2).map(_._2)

我尝试了createStream 的许多不同参数组合,但一切都失败了。有人可以帮忙吗?

【问题讨论】:

    标签: scala apache-spark apache-kafka spark-streaming


    【解决方案1】:

    您需要将类型参数添加到KafkaUtils.createStream 以使其解析流的基础类型。例如,如果您的键和值的类型为String

    val data: DStream[String] =
      KafkaUtils
        .createStream[String, String, StringDecoder, StringDecoder](
           ssc,
           kafkaParams,
           topicMap,
           StorageLevel.MEMORY_AND_DISK_SER_2
      ).map(_._2)
    

    【讨论】:

    • 好的,让我检查一下。确实问题是我想使用来自远程 Kafka 队列的消息。我可以使用curl 命令和终端的confluent API 来获取它们。但如果我运行 Scala 代码,我不会得到它们。所以,我的假设是我需要指定偏移量。
    • @Dinosaurius 您问题中的错误与偏移量无关。只是编译器无法推断出正确的类型。
    • 是的,我知道。我只是想解释一下为什么我需要设置偏移量。
    • 代码现在编译。谢谢。附言我知道这与发布的问题无关,但是如果您能猜想为什么当我在 Scala 中使用此代码时没有出现 Kafka 消息(没有给出错误),我真的很感激。有没有可能我使用curl 从终端消费了它们。哪些是可能影响的参数(除了偏移量)? IP和端口是一样的。也许,组?
    • @Dinosaurius 这是你的完整代码吗?意思是这就是你用StreamingContext所做的一切吗?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-10-17
    • 2019-11-21
    • 1970-01-01
    • 1970-01-01
    • 2019-03-14
    • 1970-01-01
    • 2018-10-31
    相关资源
    最近更新 更多