【问题标题】:How to commit manually with Kafka Stream?如何使用 Kafka Stream 手动提交?
【发布时间】:2017-09-10 23:41:56
【问题描述】:

有没有办法使用 Kafka Stream 手动提交?

通常使用KafkaConsumer,我会执行以下操作:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records){
       // process records
    }
   consumer.commitAsync();
}

我在哪里手动调用提交。我没有看到 KStream 的类似 API。

【问题讨论】:

    标签: apache-kafka apache-kafka-streams


    【解决方案1】:

    提交由 Streams 在内部自动处理,因此通常没有理由手动提交。请注意,Streams 处理此问题的方式与消费者自动提交不同——事实上,内部使用的消费者禁用了自动提交,而 Streams 会“手动”管理提交。原因是,提交只能在处理过程中的某些点发生,以确保不会丢失数据(在更新状态和刷新结果方面存在许多内部依赖性)。

    对于更频繁的提交,您可以通过StreamsConfig 参数commit.interval.ms 减少提交间隔。

    不过,手动提交可以通过低级处理器 API 间接进行。您可以使用通过init() 方法提供的context 对象来调用context#commit()。请注意,这只是尽快提交的“对 Streams 的请求”——它不是直接发出提交。

    【讨论】:

    • 如果出现异常并且流应用程序崩溃会发生什么。然后再次启动流应用程序将导致使用相同的消息并且循环将继续,直到我们删除主题并重新创建它。
    • 是的,如果你的应用程序崩溃重新启动,它会从最近提交的偏移量(类似于KafkaConsumer,实际上是内部使用)继续处理。 -- 不确定您所说的“直到我们删除该主题并重新创建它”是什么意思?提交偏移量与删除/重新创建主题有何关系?
    • 问题是我正在阅读特定消息,其中包含特殊字符,并且使用此消息导致我的流应用程序崩溃。再次,当应用程序启动时,它消耗了相同的消息并崩溃并继续循环。我的观点是我们能否在捕获异常时不手动提交消息并继续处理下一条消息
    • 不在应用程序内。根据何时您的应用程序遇到问题,您可以使用DeserializationExceptionHandler:docs.confluent.io/current/streams/developer-guide/… -- 或者您可以捕获异常并“吞下”它。
    • 谢谢现在知道了!!非常感谢?
    猜你喜欢
    • 2019-08-27
    • 2020-08-31
    • 1970-01-01
    • 2016-09-04
    • 2018-02-07
    • 1970-01-01
    • 1970-01-01
    • 2021-03-21
    • 1970-01-01
    相关资源
    最近更新 更多