【发布时间】:2017-08-30 18:28:37
【问题描述】:
我正在使用 Spark Streaming 从 Kafka 主题列表中读取数据。 我正在关注link 的官方 API。我使用的方法是:
val kafkaParams = Map("metadata.broker.list" -> configuration.getKafkaBrokersList(), "auto.offset.reset" -> "largest")
val topics = Set(configuration.getKafkaInputTopic())
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)
我想知道执行者将如何从主题列表中读取消息?他们的政策是什么?他们会阅读一个主题,然后在他们完成后将消息传递给其他主题吗?
最重要的是,在调用这个方法之后,我怎样才能在 RDD 中查看消息的主题是什么?
stream.foreachRDD(rdd => rdd.map(t => {
val key = t._1
val json = t._2
val topic = ???
})
【问题讨论】:
-
你可以使用 map 像 var records = stream.map(record => (record.topic))
-
@israel.zinc 我认为
stream中的元素是Tuple2[String,String]。没有称为主题的参数或方法
标签: scala apache-kafka spark-streaming