【问题标题】:Spark Streaming - Is it possible to consume a specific partition of Kafka Topic?Spark Streaming - 是否可以使用 Kafka 主题的特定分区?
【发布时间】:2017-02-22 19:45:29
【问题描述】:

我正在尝试使用 Spark Streaming 使用 Kafka 主题的特定分区。

我在KafkaUtils 类中看不到此用例的任何方法。

有一个方法叫createRDD,它基本上是期待offsets,它只对非流应用有用。有没有其他方法可以使用 Spark Streaming 消费 Kafka 主题的特定分区?

【问题讨论】:

    标签: apache-kafka spark-streaming


    【解决方案1】:

    没有办法使用单个分区,我们可以使用的最细粒度的是一个主题。但是,有一种方法可以指定给定消息来自特定分区。您可以在使用 createDirectStream 的重载时执行此操作,该重载采用 Function1[MessageAndMetadata, R]

    例如,假设我们有一个String 类型的键和消息,并且我们目前只从一个主题消费。我们可以这样做:

    val topicAndPartition: Map[TopicAndPartition, Long] = ???
    val kafkaProperties: Map[String, String] = ???
    
    KafkaUtils.createDirectStream[String,
                                  String, 
                                  StringDecoder,
                                  StringDecoder,
                                  (String, String)](
            streamingContext,
            kafkaConfig.properties,
            topicAndPartition,
            (mam: MessageAndMetadata[String, String]) =>
              (mam.partition, mam.message())
    

    这样,我输出了一个分区 (1) 和底层消息 (2) 的元组。然后,我可以过滤此DStream[(String, String)] 以仅包含来自特定分区的消息:

    val filteredStream = kafkaDStream.filter { case (partition, _) => partition == 4 }
    

    如果我们从多个主题消费,我们需要输出一个主题和分区的元组,以便使用正确的主题过滤分区。幸运的是,我们已经可以使用一个方便的案例类TopicAndPartition。我们有:

    (mam: MessageAndMetadata[String, String]) => 
      (TopicAndPartition(mam.topic(), mam.partition()), mam.message())
    

    然后:

    val filteredStream = kafkaDStream.filter { 
       case (tap, _) => tap.topic == "mytopic" && tap.partition == 4 
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-09-26
      • 2016-06-09
      • 2019-04-03
      • 2019-07-12
      • 1970-01-01
      • 2016-10-01
      • 1970-01-01
      • 2019-10-15
      相关资源
      最近更新 更多