【发布时间】:2019-08-14 18:42:10
【问题描述】:
我是 kafka 的新手,一直在尝试实现消费者。下面是我的场景
- 启动消费者应用程序
- 产生来自生产者的消息。这些消息由消费者消费
- 停止消费者并再次产生消息。当我启动消费者时,在消费者停止时发布的消息没有被读取
虽然auto.offset.commit=earliest会消费消息,但它会消费所有发布到主题的消息。 我只想消费消费者宕机时发布的那些消息。
var options = new KafkaOptions(new Uri(kafkaUri));
var router = new BrokerRouter(options);
var consumer = new Consumer(new ConsumerOptions(kafkaTopic, router));
var offset = consumer.GetTopicOffsetAsync(kafkaTopic, 100000).Result;
var t = from x in offset select new OffsetPosition(x.PartitionId, x.Offsets.Max());
consumer.SetOffsetPosition(t.ToArray());
foreach (Message msg in consumer.Consume()) {
string kafkaResponse = System.Text.Encoding.UTF8.GetString(msg.Value);
Console.WriteLine("PickList Json : " + kafkaResponse);
offsetCommitRequest.Offset = msg.Meta.Offset;
offsetCommitRequest.PartitionId = msg.Meta.PartitionId;
offsetCommitRequest.Topic = kafkaTopic;
offsetCommitRequest.Metadata = "CommitOffset";
var offsetCommitResponse = await _kafkaPublishService.SetOffsetvalue(kafkaUri, kafkaTopic, consumerGroup, offsetCommitRequest);
}
【问题讨论】:
-
@PraveenRewar,该链接没有回答我的问题。我想阅读在消费者关闭并随后重新启动期间发布的记录。我的 auto.offset.commit 设置为 false,因为我在处理消息后手动提交。
-
我不明白消费者重启如何影响这里的情况。 Kafka 将记住您的消费者阅读的最后一条消息,现在即使您的消费者宕机并且在主题中发布了更多消息,Kafka 仍将存储消息应该由特定消费者消费的位置的偏移量。
标签: apache-kafka kafka-consumer-api kafka-net