【发布时间】:2016-12-28 19:37:06
【问题描述】:
我想使用 Scala 2.10.6 和 Spark 1.6.2 使用来自 Kafka 主题的消息。对于 Kafka,我正在使用此依赖项:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.6.2</version>
</dependency>
这段代码编译得很好,但是我想定义auto.offset.reset,问题就出现了:
val topicMap = topic.split(",").map((_, kafkaNumThreads.toInt)).toMap
val data = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap,
StorageLevel.MEMORY_AND_DISK_SER_2).map(_._2)
当我添加kafkaParams时,它不再编译:
val kafkaParams = Map[String, String](
"zookeeper.connect" -> zkQuorum, "group.id" -> group,
"zookeeper.connection.timeout.ms" -> "10000",
"auto.offset.reset" -> "smallest")
val data = KafkaUtils.createStream(ssc, kafkaParams, topicMap,
StorageLevel.MEMORY_AND_DISK_SER_2).map(_._2)
错误信息:
94: error: missing parameter type for expanded function ((x$3) => x$3._2)
[ERROR] StorageLevel.MEMORY_AND_DISK_SER_2).map(_._2)
我尝试了createStream 的许多不同参数组合,但一切都失败了。有人可以帮忙吗?
【问题讨论】:
标签: scala apache-spark apache-kafka spark-streaming