【问题标题】:kafka how to implement exactly-once message delivery logic with topic/partition/offsetkafka如何使用topic/partition/offset实现exactly-once消息传递逻辑
【发布时间】:2017-06-06 13:37:41
【问题描述】:

我设法在使用@KafkaListener 注释的方法中获取主题/分区/偏移量,但我如何使用这些数据来实现精确一次的消费者逻辑?

我正在使用设置为 concurrenc=4 的 ConcurrentKafkaListenerContainerFactory,并将 AckMode 设置为 MANUAL。 我目前的方法是使用redis进行去重: 我使用 topic:partition 作为 redis 键,offset 作为它的值,然后将即将到来的偏移量与 redis 中的值进行比较,如果偏移量比 redis 新(大),则继续业务逻辑,否则我忽略该消息。最后提交偏移量(ack.acknowledge())

但是这种方式不起作用,例如,如果重新平衡发生在 ack.acknowledge() 完成之前,那么它就会出现这个错误:org.apache.kafka.clients.consumer.CommitFailedException,

再平衡后,原来的分区被分配给另一个线程,导致同一条消息会被消费两次。

总之,如何设计一种逻辑,让每条kafka消息都只传递一次?

【问题讨论】:

    标签: apache-kafka spring-kafka


    【解决方案1】:

    您必须写出在 Kafka 之外以原子方式处理的最后一个偏移量以及处理结果。这可以是数据库或文件,只是不要进行两次写入,使其成为数据和偏移量的单个原子写入。如果您的消费者崩溃并且它或另一个实例重新启动或接管,您需要确保首先它读取与最后处理结果一起存储的最后一个偏移量,并在您 poll() 获取更多消息之前将 seek() 到该位置。这就是今天现有的 Kafka Sink Connector 可以实现 EOS 消费的数量。

    【讨论】:

    • 好主意!所以要实现这一点,我需要放弃 spring-kafka 并转向 kafka 官方低级 API
    • 我不太了解 spring-kafka api,无法发表评论,但您可以使用 Kafka Consumer API 或 Kafka Connect API 仅执行一次此类操作。在不久的将来,您将能够使用 Kafka Streams API,而不必在 Kafka 之外存储任何内容,因为 Kafka 本身将支持事务写入。
    【解决方案2】:

    Kafka 还不完全支持一次。它将在 0.11.0.0 版本中提供:https://issues.apache.org/jira/browse/KAFKA-4923 此版本计划于 2017 年 6 月 14 日发布,因此您可以等待或自己构建这个复杂的逻辑;-)

    【讨论】:

    • 我知道kafka暂时不支持,只是想知道有没有办法我们自己实现...
    • 这可能是一个有趣的阅读/起点:brooker.co.za/blog/2014/11/15/exactly-once.html
    • kafka v0.11 中的一次性语义将是:1. 从生产者的角度来看,以及 2. 从 kafka-streams 应用程序的角度来看。但是,从 kafka->kafka 消费者的角度来看,这并不是一次。通用消费者的用户仍然必须实现偏移量和应用程序状态的原子持久化,如果它存储在外部存储中。
    猜你喜欢
    • 2020-08-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-08-25
    • 2019-11-15
    • 2019-12-10
    • 2019-08-25
    • 1970-01-01
    相关资源
    最近更新 更多