【发布时间】:2016-10-22 22:33:25
【问题描述】:
关于 KafkaConsumer (>=0.9),我在尝试实施满足我需求的解决方案时遇到了一些严重问题。
假设我有一个函数,它只需要读取来自 kafka 主题的 n 条消息。
例如:getMsgs(5) --> 获取主题中的下 5 条 kafka 消息。
所以,我有一个看起来像这样的循环。使用实际正确的参数进行编辑。在这种情况下,消费者的 max.poll.records 参数设置为 1,因此实际循环只迭代了一次。不同的消费者(其中一些通过许多消息迭代)共享一个抽象的父亲(这个),这就是它被编码的原因。 numMss 部分是针对该消费者的临时部分。
for (boolean exit= false;!exit;)
{
Records = consumer.poll(config.pollTime);
for (Record r:records)
{
processRecord(r); //do my things
numMss++;
if (numMss==maximum) //maximum=5
{
exit=true;
break;
}
}
}
考虑到这一点,问题在于 poll() 方法可能会收到超过 5 条消息。例如,如果它收到 10 条消息,我的代码将永远忘记其他 5 条消息,因为 Kafka 会认为它们已经被消费了。
我尝试提交偏移量但似乎不起作用:
consumer.commitSync(Collections.singletonMap(partition,
new OffsetAndMetadata(record.offset() + 1)));
即使使用偏移配置,每当我再次启动消费者时,它不会从第 6 条消息开始(记住,我只想要 5 条消息),但是从 11 日 开始(因为第一次投票消耗了 10 条消息)。
是否有任何解决方案,或者也许(最肯定)我错过了什么?
提前致谢!!
【问题讨论】:
标签: apache-kafka kafka-consumer-api