【问题标题】:How to consume kafka messages from last saved offset如何使用上次保存的偏移量的 kafka 消息
【发布时间】:2019-08-14 18:42:10
【问题描述】:

我是 kafka 的新手,一直在尝试实现消费者。下面是我的场景

  1. 启动消费者应用程序
  2. 产生来自生产者的消息。这些消息由消费者消费
  3. 停止消费者并再次产生消息。当我启动消费者时,在消费者停止时发布的消息没有被读取

虽然auto.offset.commit=earliest会消费消息,但它会消费所有发布到主题的消息。 我只想消费消费者宕机时发布的那些消息。

var options = new KafkaOptions(new Uri(kafkaUri));
var router = new BrokerRouter(options);
var consumer = new Consumer(new ConsumerOptions(kafkaTopic, router));
var offset = consumer.GetTopicOffsetAsync(kafkaTopic, 100000).Result;
var t = from x in offset select new OffsetPosition(x.PartitionId, x.Offsets.Max());
consumer.SetOffsetPosition(t.ToArray());

foreach (Message msg in consumer.Consume()) {
    string kafkaResponse = System.Text.Encoding.UTF8.GetString(msg.Value);

    Console.WriteLine("PickList Json : " + kafkaResponse);

    offsetCommitRequest.Offset = msg.Meta.Offset;
    offsetCommitRequest.PartitionId = msg.Meta.PartitionId;
    offsetCommitRequest.Topic = kafkaTopic;
    offsetCommitRequest.Metadata = "CommitOffset";
    var offsetCommitResponse = await _kafkaPublishService.SetOffsetvalue(kafkaUri, kafkaTopic, consumerGroup, offsetCommitRequest);
}

【问题讨论】:

  • @PraveenRewar,该链接没有回答我的问题。我想阅读在消费者关闭并随后重新启动期间发布的记录。我的 auto.offset.commit 设置为 false,因为我在处理消息后手动提交。
  • 我不明白消费者重启如何影响这里的情况。 Kafka 将记住您的消费者阅读的最后一条消息,现在即使您的消费者宕机并且在主题中发布了更多消息,Kafka 仍将存储消息应该由特定消费者消费的位置的偏移量。

标签: apache-kafka kafka-consumer-api kafka-net


【解决方案1】:

您应该在不设置任何手动偏移量的情况下重新启动您的消费者。 意味着你不应该这样做:

var offset = consumer.GetTopicOffsetAsync(kafkaTopic, 100000).Result;
var t = from x in offset select new OffsetPosition(x.PartitionId, x.Offsets.Max());
consumer.SetOffsetPosition(t.ToArray());

您的消费者将从上次提交的地方赶上来

亚尼克

【讨论】:

    猜你喜欢
    • 2019-06-08
    • 1970-01-01
    • 2019-01-06
    • 1970-01-01
    • 2021-02-14
    • 2019-01-19
    • 1970-01-01
    • 2018-05-08
    • 1970-01-01
    相关资源
    最近更新 更多