【发布时间】:2018-12-17 16:32:27
【问题描述】:
我不完全理解消费者错误处理如何与提交偏移量和 akcMode 一起工作,以及它如何因错误停止容器而受到影响(使用 spring-kafka 1.3.*)。
假设我有两个消费者(使用两个分区),他们在轮询时都从他们的分区中获取 5 个事件 (max.records.per.poll=5)。
第一个消费者 - 第一个事件处理正常,处理第二个事件失败 - 所以在错误处理程序中我调用kafkaListenerEndpointRegistry.stop(),但由于实施了停止它只是停止消费者轮询,两个消费者仍然完成处理他们当前的批次。因此,第一个消费者处理事件 3、4、5(所有这些事件都没有错误地处理),假设第二个消费者在第 4 个事件上失败(并且事件 1、2、3、5 处理正常)。
我的问题是将为每个消费者提供哪些补偿?
我的理解是:
- 当我将
AckMode.RECORD/BATCH与ackOnError结合使用时- 将为两个消费者提交最新的偏移量(5) - 当我将
AckMode.RECORD/BATCH与!ackOnError结合使用时 - 还将为两个消费者提交最新的偏移量 - 因为虽然在处理批处理期间某些事件失败,但批处理中最新处理的事件是正常的,因此最新处理的事件偏移量获胜.
我的理解正确吗?
【问题讨论】:
标签: apache-kafka kafka-consumer-api spring-kafka