【问题标题】:Unable to commit consuming offsets to Kafka on checkpoint in Flink new Kafka consumer-api (1.14)无法在 Flink 新 Kafka 消费者 API (1.14) 的检查点上向 Kafka 提交消费偏移量
【发布时间】:2021-12-16 07:29:52
【问题描述】:

我指的是 Kafka 源连接器的 Flink 1.14 版本,代码如下。

我期待以下要求。

  • 在应用程序刚开始时,必须从 Kafka 主题中读取最新的偏移量
  • 在检查点上,它必须将消耗的偏移量提交给 Kafka
  • 重新启动后(当应用程序手动终止/系统错误时),它必须从最后提交的偏移量中进行选择,并且应该必须消耗消费者滞后和此后的新事件源。

使用 Flink 新的 KafkaConsumer API (KafkaSource) 我面临以下问题

  • 能够满足上述要求,但无法在检查点(500 毫秒)上提交消耗的偏移量。它宁可在 2 秒或 3 秒后提交。

当您在 2s/3s 内手动终止应用程序并重新启动时。由于最后一条消费的消息没有提交,它被读取了两次(重复)。

为了交叉检查这个功能,我尝试过使用 Flink Kafka 的旧消费者 API (FlinkKafkaConsumer)。在那里它完美地工作。当消息被立即消费时,它会被提交回 Kafka。

遵循的步骤

  • 设置 Kafka 环境
  • 运行flink下面的代码来消费。代码包括新旧 API。这两个 API 都将从 Kafka 主题消费并在控制台打印
  • 向 Kafka 主题推送一些消息。
  • 在推送一些消息并在控制台中显示之后终止 Flink 作业。
  • 检查两个 API 的 kafka 消费者组。与旧的消费者 api 的 group-id(older_test1) 相比,新的 flink 消费者 api 的 group-id(test1) 消费者滞后 > 0。
  • 重新启动 Flink 作业时,您可以从新的 Flink kafka-consumer API 看到这些未提交的消息在控制台中可见,从而导致重复消息。

如果我缺少任何东西或需要添加任何属性,请提出建议。

 @Test
    public void test() throws Exception {

        System.out.println("FlinkKafkaStreamsTest started ..");

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.enableCheckpointing(500);
        env.setParallelism(4);

        Properties propertiesOld = new Properties();
        Properties properties = new Properties();
        String inputTopic = "input_topic";
        String bootStrapServers = "localhost:29092";
        String groupId_older = "older_test1";
        String groupId = "test1";

        propertiesOld.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
        propertiesOld.put(ConsumerConfig.GROUP_ID_CONFIG, groupId_older);
        propertiesOld.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);


        /******************** Old Kafka API **************/
        FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>(inputTopic,
                new KRecordDes(),
                propertiesOld);
        flinkKafkaConsumer.setStartFromGroupOffsets();
        env.addSource(flinkKafkaConsumer).print("old-api");


        /******************** New Kafka API **************/
        KafkaSourceBuilder<String> sourceBuilder = KafkaSource.<String>builder()
                .setBootstrapServers(bootStrapServers)
                .setTopics(inputTopic)
                .setGroupId(groupId)
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setProperty("enable.auto.commit", "false")
                .setProperty("commit.offsets.on.checkpoint", "true")
                .setProperties(properties)
                .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST));

        KafkaSource<String> kafkaSource = sourceBuilder.build();

        SingleOutputStreamOperator<String> source = env
                .fromSource(kafkaSource, WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source");

        source.print("new-api");

        env.execute();
    }
    static class KRecordDes implements  KafkaDeserializationSchema<String>{
        @Override
        public TypeInformation<String> getProducedType() {
            return TypeInformation.of(String.class);
        }
        @Override
        public boolean isEndOfStream(String nextElement) {
            return false;
        }
        @Override
        public String deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception {
            return new String(consumerRecord.value());
        }
    }

注意:我还有其他要求,我希望在相同的代码中使用 Flink Kafka 有界源阅读器,这在新的 API(KafkaSource)中可用。

【问题讨论】:

    标签: apache-flink flink-streaming


    【解决方案1】:

    来自Kafka Source的文档:

    请注意,Kafka 源确实依赖已提交的偏移量 容错性。提交偏移量只是为了暴露进度 用于监控的消费者和消费组。

    当 Flink 作业从失败中恢复时,它不会使用 broker 上提交的偏移量,而是从最近一次成功的检查点恢复状态,并从该检查点中存储的偏移量继续消费,因此检查点之后的记录将被“重放” “ 一点点。由于您使用的是不支持完全一次语义的打印接收器,因此您将看到重复的记录,这些记录实际上是最近一次成功检查点之后的记录。

    你提到的offset commit的2-3秒延迟,是因为SourceReaderBase的实现。简而言之,SplitFetcher 管理一个任务队列,当一个偏移提交任务被推入队列时,它不会被执行,直到调用 KafkaConsumer#poll() 的正在运行的 fetch 任务超时。如果流量很小,延迟可能会更长。但请注意,这不会影响正确性:KafkaSource 不使用提交的偏移量来进行容错。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2022-11-13
      • 2017-08-22
      • 1970-01-01
      • 1970-01-01
      • 2021-11-10
      • 2022-11-11
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多