【发布时间】:2020-10-14 12:53:53
【问题描述】:
我正在使用 Apache Kafka API 并尝试一次只获取一条消息。我只写一个主题。我可以通过带有文本框的弹出 UI 屏幕来发送和接收消息。我在文本框中输入了一个字符串,然后单击“发送”。我可以发送任意数量的消息。假设我发送了 3 条消息,而我的 3 条消息是“hi”、“lol”、“bye”。还有一个“接收”按钮。现在,使用TutorialsPoint 中的传统代码,当我单击接收按钮时,我会在控制台上一次打印所有 3 条消息(嗨、哈哈、再见)。但是,我只想在单击 UI 上的“接收”时一次打印一条消息。例如,我第一次点击接收按钮时,它会打印“hi”,第二次会打印“lol”,第三次会打印“bye”。我是 Kafka 的新手,对如何做到这一点感到困惑。我尝试从代码中删除两个循环,所以它只有
ConsumerRecords<String, String> records = consumer.poll(100);
System.out.printf(records.iterator().next().value());
如果我只有这两行代码,第一次点击接收按钮时,它会打印“hi”,但第二次按下它时,会收到消息“尝试心跳失败,因为组正在重新平衡 kafka。 "当我设置 max.poll.records = 1 时,我也遇到了错误,因为我希望所有消息最终,但是当按下接收按钮时,只需要将其中一个消息记录到控制台。下一次,将记录主题中未记录的下一条消息。
希望这是有道理的! 感谢任何帮助! 提前致谢!
编辑: 包含队列后的新代码,因此我们可以在发送和接收消息之间交替并在有新消息时更新队列:
if (payloadQueue.isEmpty()){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
if (records.isEmpty()) {
log.info("No More Records to Add");
consumer.close();
}
else {
records.iterator().forEachRemaining(record -> {
log.info("RECORD: " + record);
payloadQueue.offer(record);
});
payload = payloadQueue.poll().value();
log.info("Received event from KAFKA on subject {} with payload \"{}\"", subject, payload);
}
}
else {
payload = payloadQueue.poll().value();
log.info("Received event from KAFKA on subject {} with payload \"{}\"", subject, payload);
}
【问题讨论】:
标签: java apache-kafka kafka-consumer-api