【发布时间】:2017-06-06 13:37:41
【问题描述】:
我设法在使用@KafkaListener 注释的方法中获取主题/分区/偏移量,但我如何使用这些数据来实现精确一次的消费者逻辑?
我正在使用设置为 concurrenc=4 的 ConcurrentKafkaListenerContainerFactory,并将 AckMode 设置为 MANUAL。 我目前的方法是使用redis进行去重: 我使用 topic:partition 作为 redis 键,offset 作为它的值,然后将即将到来的偏移量与 redis 中的值进行比较,如果偏移量比 redis 新(大),则继续业务逻辑,否则我忽略该消息。最后提交偏移量(ack.acknowledge())
但是这种方式不起作用,例如,如果重新平衡发生在 ack.acknowledge() 完成之前,那么它就会出现这个错误:org.apache.kafka.clients.consumer.CommitFailedException,
再平衡后,原来的分区被分配给另一个线程,导致同一条消息会被消费两次。
总之,如何设计一种逻辑,让每条kafka消息都只传递一次?
【问题讨论】: