【发布时间】:2019-07-16 13:33:48
【问题描述】:
我有三个问题:
- “最旧的偏移量”是什么意思?最旧的偏移量并不意味着偏移量 0?
// OffsetOldest 代表代理上可用的最旧偏移量
// 分区。
OffsetOldest int64 = -2
-
假设
A.三个代理在同一台机器上运行
B. 消费组只有一个消费线程
C. 消费者配置 OffsetOldest 标志。
D. 已经产生了 100 条消息,目前消费者线程已经消耗了 90 条消息。那么如果消费者线程重新启动,那么这个消费者将从哪个偏移量开始消费呢?是 91 还是 0?
-
在我们下面的代码中,似乎每次启动消费者时都会重新使用消息。但实际上它并不总是发生。为什么重用只是在重启后发生几次(不是全部)?
func (this *consumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for message := range claim.Messages() { this.handler(message) session.MarkMessage(message, "") } return nil } ctx := context.Background() conf := sarama.NewConfig() conf.Version = sarama.V2_0_0_0 conf.Consumer.Offsets.Initial = sarama.OffsetOldest conf.Consumer.Return.Errors = true consumer, err := sarama.NewConsumerGroup(strings.Split(app.Config().KafkaBrokers, ","), groupId, conf) if err != nil { logger.Error("NewConsumerGroupFromClient(%s) error: %v", groupId, err) return }
【问题讨论】:
标签: go apache-kafka kafka-consumer-api sarama