【发布时间】:2021-08-16 20:56:03
【问题描述】:
我正在使用 @KafkaListener 和 props as
max.poll.records 为 50。(每条记录需要 40-60 秒来处理)
启用自动提交=false
确认模式到手动立即
下面是逻辑
@KafkaListener(groupId=“ABC”, topic=“Data1” containerFactory=“myCustomContainerFactory”)
public void listen(ConsumerRecord<String, Object> record, Acknowledge ack) {
try{
process(record);
ack.acknowledge();
}
Catch(e){
reprocess() // pause container and seek
}
}
max.poll.interval.ms、session.timeout.ms 或 heartbeat 等其他属性为默认值
我无法理解这里出了什么问题,
假设如果 500 条消息发布到 2 个分区
-
我不确定为什么消费者没有按照 max.poll.records 属性轮询记录,实际上它会在应用程序启动或消息由生产者发布后立即轮询所有 500 个消息
-
据观察,在处理一些记录后,消费者会在大约 5-7 分钟后再次读取偏移量。实际上,它已被读取并经过精细处理和确认。
一小时后,日志文件显示相同的消息被多次读取。
任何帮助表示赞赏 谢谢。
【问题讨论】:
标签: apache-kafka kafka-consumer-api spring-kafka producer-consumer consumer