【问题标题】:Apache Kafka: How to receive latest message from Kafka?Apache Kafka:如何接收来自 Kafka 的最新消息?
【发布时间】:2018-02-08 13:37:11
【问题描述】:

我正在使用 Scala 中的 Spark 在 Kafka 消费者应用程序中消费和处理消息。有时,处理来自 Kafka 消息队列的消息所需的时间比平时多一点。这时我需要消费最新的消息,忽略生产者已经发布但尚未消费的较早消息。

这是我的消费者代码:

object KafkaSparkConsumer extends MessageProcessor {

def main(args: scala.Array[String]): Unit = {
  val properties = readProperties()

  val streamConf = new SparkConf().setMaster("local[*]").setAppName("Kafka-Stream")
  val ssc = new StreamingContext(streamConf, Seconds(1))

  val group_id = Random.alphanumeric.take(4).mkString("dfhSfv")
  val kafkaParams = Map("metadata.broker.list"         ->  properties.getProperty("broker_connection_str"), 
                      "zookeeper.connect"              ->  properties.getProperty("zookeeper_connection_str"), 
                      "group.id"                       ->  group_id, 
                      "auto.offset.reset"              ->  properties.getProperty("offset_reset"),
                      "zookeeper.session.timeout"      ->  properties.getProperty("zookeeper_timeout"))

  val msgStream = KafkaUtils.createStream[scala.Array[Byte], String, DefaultDecoder, StringDecoder](
                      ssc,
                      kafkaParams,
                      Map("moved_object" -> 1),
                      StorageLevel.MEMORY_ONLY_SER
                      ).map(_._2)

  msgStream.foreachRDD { x =>  
    x.foreach {  
      msg => println("Message: "+msg)
      processMessage(msg)
    }    
  }                         
  ssc.start()
  ssc.awaitTermination()
  }  
}

有什么方法可以确保消费者始终获得消费者应用程序中的最新消息?或者我是否需要在 Kafka 配置中设置任何属性来实现相同的效果?

对此的任何帮助将不胜感激。谢谢

【问题讨论】:

    标签: scala apache-spark apache-kafka spark-streaming kafka-consumer-api


    【解决方案1】:

    您始终可以在连接到 Kafka 时生成一个新的(随机)组 id - 这样您将在连接时开始使用新消息。

    【讨论】:

    • 每次开始执行消费者应用程序时,我都会生成随机组 ID。它以这种方式获取最新消息,但如果处理需要更多时间,它会继续处理我不需要的旧消息。
    【解决方案2】:

    Kafka消费者API包含方法

    void seekToEnd(Collection<TopicPartition> partitions)
    

    因此,您可以从消费者那里获得分配的分区,并寻找所有分区到最后。 seekToBeginning 也有类似的方法。

    【讨论】:

    • 我已经在问题描述中发布了我的代码。你能看看它并建议我在哪里添加这个方法吗?谢谢你的回答:)
    • 你对 auto.offset.reset 有什么价值?
    • 它已设置为“最大”,它会自动将值重置为最大偏移量。
    【解决方案3】:

    您可以利用两个 KafkaConsumer API 从分区中获取最后一条消息(假设日志压缩不会成为问题):

    1. public Map&lt;TopicPartition, Long&gt; endOffsets(Collection&lt;TopicPartition&gt; partitions):这为您提供了给定分区的结束偏移量。请注意,结束偏移量是要传递的下一条消息的偏移量。
    2. public void seek(TopicPartition partition, long offset):为每个分区运行此命令,并提供其与上述调用的结束偏移量减 1(假设它大于 0)。

    【讨论】:

    • 我已经在问题描述中发布了我的代码。你能看看它并建议我在哪里添加这个方法吗?谢谢你的回答:)
    【解决方案4】:

    是的,您可以将 staringOffset 设置为 latest 以使用最新消息。

    val spark = SparkSession
      .builder
      .appName("kafka-reading")
      .getOrCreate()
    
    import spark.implicits._
     val df = spark
             .readStream
             .format("kafka")
             .option("kafka.bootstrap.servers", "localhost:9092")
             .option("startingOffsets", "latest")
             .option("subscribe", topicName)
             .load()
    

    【讨论】:

    • 我已经在问题描述中发布了我的代码。你能看看它并建议我在哪里添加你的代码吗?谢谢你的回答:)
    • 定义kafkaParams时需要添加这个属性。 consumer.forcefromstart=false 有关更多信息,您可以查看消费者属性。 github.com/dibbhatt/kafka-spark-consumer
    • 好的,马赫什。我需要几天的时间来检查它。我会尽快给您回复。再次感谢您。
    • 我试过了,马赫什。尽管如此,我还是按顺序收到所有消息,而不仅仅是最新消息。
    • 是这个 "auto.offset.reset" -> "latest" 的值吗?
    猜你喜欢
    • 1970-01-01
    • 2015-05-20
    • 1970-01-01
    • 2020-01-10
    • 1970-01-01
    • 2017-08-14
    • 2022-07-28
    • 2019-01-23
    • 1970-01-01
    相关资源
    最近更新 更多