【问题标题】:kafka to consume only new messageskafka 只消费新消息
【发布时间】:2015-07-17 04:31:56
【问题描述】:

我的 Spark 流式传输作业正在使用来自 Kafka 的数据

KafkaUtils.createStream(jssc, prop.getProperty(Config.ZOOKEEPER_QUORUM),
                        prop.getProperty(Config.KAFKA_CONSUMER_GROUP), topicMap);

每当我重新开始我的工作时,它就会从最后一个偏移存储开始消耗(我假设这是因为发送处理后的数据需要很长时间,如果我更改消费者组,它会立即使用新消息)

我是 kafka 8.1.1,其中 auto.offset.reset 默认为最大,这意味着每当我重新启动 kafka 时,都会从我离开的地方发送数据。

我的用例要求我忽略此数据并仅处理到达的数据。我怎样才能做到这一点? 任何建议

【问题讨论】:

    标签: apache-spark apache-kafka spark-streaming kafka-consumer-api


    【解决方案1】:

    有两种方法可以实现:

    1. 每次重启时创建一个唯一的消费者组,它将从最新的偏移量开始消费。

    2. 使用直接方法而不是基于接收器;在这里,您可以更好地控制您的消费方式,但必须手动更新 zookeeper 以存储您的偏移量。在下面的示例中,它将始终从最新的偏移量开始。

      import org.apache.spark.streaming.kafka._
      val topicsSet = topics.split(",").toSet
      val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
      val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
      

    这里有直接方法的文档:https://spark.apache.org/docs/latest/streaming-kafka-integration.html

    【讨论】:

    • 我发现了另一种方法 Spark 1.5(我测试过),使用 kafka 直接 api 并且不使用检查点。
    • 第三种方法是使用 seekToEnd() 如果您想手动转到主题的末尾。
    猜你喜欢
    • 1970-01-01
    • 2017-09-23
    • 1970-01-01
    • 2022-06-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-08-11
    相关资源
    最近更新 更多