【发布时间】:2020-12-07 16:17:13
【问题描述】:
我使用 confluent .net 客户端。 订阅者总是在重启后(订阅者服务重启)读取来自 Kafka 主题的所有消息。 如何提交消费者已经实现并从中读取的偏移量? 也许一些消费者配置可以帮助......
【问题讨论】:
标签: apache-kafka confluent-kafka-dotnet
我使用 confluent .net 客户端。 订阅者总是在重启后(订阅者服务重启)读取来自 Kafka 主题的所有消息。 如何提交消费者已经实现并从中读取的偏移量? 也许一些消费者配置可以帮助......
【问题讨论】:
标签: apache-kafka confluent-kafka-dotnet
这只是一个疯狂的猜测,但是如何声明消费者的组 id? 我见过一些使用这种随机分配的例子:
["group.id"] = Guid.NewGuid().ToString(),
如果您每次启动消费者时都声明一个新的/随机的group.id,这将导致在每次执行时注册一个新的消费者组,这涉及到auto.offset.reset。
如果此属性设置为“earliest”,那么每次启动消费者时(假设他们每次都有不同的 group.id),他们将从第一个可用偏移量开始,就像你的情况一样,读取所有的消息从头开始。
如果此属性设置为“latest”,并且您的生产者当前没有发送任何消息,您将无法读取任何内容,这可能会造成一些混乱。
尝试设置一个固定的group.id:开始消费,当消息在代理上仍然可用时停止进程,然后再次启动消费者,而不更改最后一个group.id。
这一次,由于消费者组已经注册,auto.offset.reset 将被忽略,起始位置将由您提交的偏移量定义,默认情况下存储在名为 __consumer_offsets 的特殊主题中。
【讨论】: