【发布时间】:2019-08-23 03:20:25
【问题描述】:
我在 Kafka 代理中有一个包含 3 个分区的主题 1个分区有消息,2个分区是空的,我怎么知道一次调用消耗哪个分区?
首先我分配一个分区等于kafka.PartitionAny的TopicPartition,但是这个值一直返回-1
所以我必须手动使用计数器,当我从一个分区成功消费但消息为空时,然后count++ 并从下一个开始,直到找到消息
for{
partitions = append(partitions, kafka.TopicPartition{
Topic: &topic,
Partition: partition,
Offset: offSet,
Error: err,
})
err = c.Assign(partitions)
if err != nil {
return err
}
// retrieve message
ev, err := c.Poll(-1)
if err != nil {
return err
}
// if no message, check the next partition
if ev == nil{
partition++
}else{
break
}
}
前两轮不返回任何消息,但要等待第三轮返回,有什么办法可以自动检测到哪个分区存储了未消费的消息?
如果没有其他办法,Kafka可以为我做轮询路由吗?或者我必须自己记录计数器
谢谢! :)
【问题讨论】:
-
如果您只想消费所有消息而不知道消息将存储在哪个分区,则可以使用
Subscribe()而不是Assign()。 -
@max23_感谢您的回复!但是如果我使用 Subscribe(),我该如何手动提交偏移量?
-
需要自己手动处理偏移量吗?有一个名为
enable.auto.commit的消费者配置默认设置为 true,它将定期在后台提交偏移量。 -
@max23_ 是的,我需要自己处理偏移量:( 订阅方法似乎无法指定要消耗的偏移量
-
哦,我看到你使用 subscribe 方法时有一个 rebalance 回调。它将返回您的 AssignedPartitions 或 RevokedPartitions 具有
TopicPartition列表的事件。可能可以像处理 AssignedPartitions 事件一样尝试一下。
标签: go apache-kafka kafka-consumer-api messagebroker