【问题标题】:Manually Commit offsetRanges to Kafka when using KafkaUtils.createDirectStream使用 KafkaUtils.createDirectStream 时手动将 offsetRanges 提交到 Kafka
【发布时间】:2017-12-05 18:25:03
【问题描述】:

我正在尝试使用 Kafka Utils Api 将数据从 Kafka (0.10.0.0) 消耗到 Spark (1.6.0) 流应用程序

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, inputTopicsSet)

要求是手动将偏移范围提交到 Kafka 本身。

请注意,在 java 中使用 KafkaConsumer(或 Consumer)对象时,我们可以在参数中设置 "enable.auto.commit" = "false" 后使用 commitAsync 或 commitSync 方法来实现。 p>

我无法弄清楚使用 KafkaUtils 时的方法。

【问题讨论】:

    标签: apache-kafka spark-streaming


    【解决方案1】:

    您可以将“enable.auto.commit”=“false”作为kafkaParams 的一部分传递。事实上,您可以传递任何 Kafka 消费者设置作为其中的一部分。

        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
        kafkaParams.put("enable.auto.commit", true); 
        //more kafka params goes here if needed
        JavaInputDStream<ConsumerRecord<String, String>> stream =
        KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, inputTopicsSet)
    

    然后像这样手动提交偏移量,

    stream.foreachRDD(rdd -> {
      OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
    
      // some time later, after outputs have completed
      ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
    });
    

    参考:https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html#creating-a-direct-stream

    以上代码适用于spark 2.2.0,可能不适用于spark 1.6.0。

    【讨论】:

    • 感谢您的回复。但我的要求是手动提交而不是自动提交。我希望根据我的自定义要求用一些额外的逻辑覆盖提交代码并手动触发它(而不是自动),但我无法识别它
    • 另外,您提供的链接是 Spark 2.2.0。它使用以下包,该包不适用于低于 2.0.0 的版本——org.apache.spark.streaming.kafka010._
    • 我的错。虽然相同的链接提供了手动提交偏移量的信息,但同样它可能不适用于 spark 1.6.0。
    • 为使用 spark 2.x 的其他人更新了答案。
    猜你喜欢
    • 2017-09-10
    • 2016-09-04
    • 2019-08-27
    • 2021-03-21
    • 1970-01-01
    • 2018-02-07
    • 1970-01-01
    • 2020-09-08
    • 2017-10-07
    相关资源
    最近更新 更多