【问题标题】:Spring Kafka - Re reading an offset after sometimeSpring Kafka - 一段时间后重新读取偏移量
【发布时间】:2021-08-16 20:56:03
【问题描述】:

我正在使用 @KafkaListener 和 props as

max.poll.records 为 50。(每条记录需要 40-60 秒来处理)

启用自动提交=false

确认模式到手动立即

下面是逻辑

@KafkaListener(groupId=“ABC”, topic=“Data1” containerFactory=“myCustomContainerFactory”)
public void listen(ConsumerRecord<String, Object> record, Acknowledge ack) {

try{

process(record);

ack.acknowledge();

}

Catch(e){
 reprocess() // pause container and seek

  }
}

max.poll.interval.ms、session.timeout.ms 或 heartbeat 等其他属性为默认值

我无法理解这里出了什么问题,

假设如果 500 条消息发布到 2 个分区

  1. 我不确定为什么消费者没有按照 max.poll.records 属性轮询记录,实际上它会在应用程序启动或消息由生产者发布后立即轮询所有 500 个消息

  2. 据观察,在处理一些记录后,消费者会在大约 5-7 分钟后再次读取偏移量。实际上,它已被读取并经过精细处理和确认。

一小时后,日志文件显示相同的消息被多次读取。

任何帮助表示赞赏 谢谢。

【问题讨论】:

    标签: apache-kafka kafka-consumer-api spring-kafka producer-consumer consumer


    【解决方案1】:

    默认的max.poll.interval.ms 是 300,000 毫秒(5 分钟)。

    您要么需要减少 max.poll.records 要么增加间隔 - 否则 Kafka 将由于无响应的消费者而强制重新平衡。

    处理时间这么长,推荐max.poll.records=1;您显然不需要更高的吞吐量。

    【讨论】:

    • 感谢@Gary 的回复.. 这有帮助.. 会试一试。
    猜你喜欢
    • 2017-01-31
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-08-16
    • 1970-01-01
    • 1970-01-01
    • 2021-10-05
    • 1970-01-01
    相关资源
    最近更新 更多