【问题标题】:Spark Streaming + Kafka: how to check name of topic from kafka messageSpark Streaming + Kafka:如何从 kafka 消息中检查主题名称
【发布时间】:2017-08-30 18:28:37
【问题描述】:

我正在使用 Spark Streaming 从 Kafka 主题列表中读取数据。 我正在关注link 的官方 API。我使用的方法是:

val kafkaParams = Map("metadata.broker.list" -> configuration.getKafkaBrokersList(), "auto.offset.reset" -> "largest")
val topics = Set(configuration.getKafkaInputTopic())
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
            ssc, kafkaParams, topics)

我想知道执行者将如何从主题列表中读取消息?他们的政策是什么?他们会阅读一个主题,然后在他们完成后将消息传递给其他主题吗?

最重要的是,在调用这个方法之后,我怎样才能在 RDD 中查看消息的主题是什么?

stream.foreachRDD(rdd => rdd.map(t => {
        val key = t._1
        val json = t._2
        val topic = ???
})

【问题讨论】:

  • 你可以使用 map 像 var records = stream.map(record => (record.topic))
  • @israel.zinc 我认为stream 中的元素是Tuple2[String,String]。没有称为主题的参数或方法

标签: scala apache-kafka spark-streaming


【解决方案1】:

我想知道执行者将如何读取来自 主题列表?他们的政策是什么?他们会阅读一个主题吗? 那么当他们完成消息传递给其他主题时?

在直接流式处理方法中,驱动程序负责将偏移量读取到您要使用的 Kafka 主题中。它的作用是创建主题、分区和需要读取的偏移量之间的映射。之后,驱动程序为每个工作人员分配一个 range 以读取特定的 Kafka 主题。这意味着,如果单个 worker 可以同时运行 2 个任务(仅出于示例的目的,它通常可以运行更多),那么它可能会同时从 Kafka 的两个不同主题中读取。

我怎样才能在调用这个方法后,检查一个主题是什么? RDD 中的消息?

您可以使用createDirectStream 的重载,它采用MessageHandler[K, V]

val topicsToPartitions: Map[TopicAndPartition, Long] = ???

val stream: DStream[(String, String)] = 
  KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
        ssc, 
        kafkaParams, 
        topicsToPartitions,
        mam: MessageAndMetadata[String, String]) => (mam.topic(), mam.message())

【讨论】:

  • 谢谢@Yuval,但仍然如此。从 Kafka 阅读时如何访问消息和主题。以messageHandler 作为createDirectStream 的参数,看起来我无论如何都做不到。
  • @salvob 我的代码 sn-p 正是这样做的。输出将是DStream[(String, String)],其中第一个是主题名称。
  • 您的代码定义了一个流,该流应该包含其中每条记录的消息和主题。但是当我尝试打印元组的内容时(在我的问题中使用 sn-p - 使用 println(key + topic + message) ,没有任何反应。rdd.count() 方法虽然正确返回了消息数
  • @salvob 你试过我的 sn-p 了吗?
  • 是的!不同之处在于topicsToPartitions 需要是一个地图。我拥有的流是InputDStream,甚至没有 rdd.collect 向我显示消息
猜你喜欢
  • 1970-01-01
  • 2018-01-09
  • 2019-07-12
  • 2019-06-24
  • 1970-01-01
  • 2017-10-30
  • 2017-08-08
  • 2018-06-25
  • 2015-02-04
相关资源
最近更新 更多