【问题标题】:How to commit an offset on java flink with KafkaSource如何使用 KafkaSource 在 java flink 上提交偏移量
【发布时间】:2022-06-17 02:42:28
【问题描述】:

我想处理来自 kafka 的消息,然后提交该消息,一旦 flink 消费和处理所有消息结束作业,使用 taskmanager 和心跳升级进程

StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
    KafkaSource<String> source = KafkaSource.<String>builder()
            .setBootstrapServers(address)
            .setTopics(inputTopic)
            .setGroupId(consumerGroup)
            .setStartingOffsets(OffsetsInitializer.earliest())
            .setValueOnlyDeserializer(new SimpleStringSchema())
            .setProperty("enable.auto.commit", "true")
            .setProperty("commit.offsets.on.checkpoint", "true")
            .build();

    DataStream<String> stream = environment.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
    ObjectMapper mapper = new ObjectMapper();
    stream.map((value) -> {

【问题讨论】:

  • 请提供足够的代码,以便其他人更好地理解或重现问题。

标签: java apache-kafka apache-flink


【解决方案1】:

如果您希望停止作业,则应将其设置为批处理作业,而不是流作业。更多信息:https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/.

至于向 kafka broker 提交记录,在每个成功的 checkpoint/savepoint 上由 flink 自动完成,因此您无需在这方面做任何事情。

【讨论】:

    猜你喜欢
    • 2021-04-06
    • 1970-01-01
    • 2017-06-24
    • 2015-10-03
    • 2022-10-04
    • 2019-05-20
    • 2017-12-10
    • 2020-01-27
    • 1970-01-01
    相关资源
    最近更新 更多