【问题标题】:How to get messages from Kafka Consumer one by one in java?java - 如何在java中一一获取来自Kafka Consumer的消息?
【发布时间】:2020-10-14 12:53:53
【问题描述】:

我正在使用 Apache Kafka API 并尝试一次只获取一条消息。我只写一个主题。我可以通过带有文本框的弹出 UI 屏幕来发送和接收消息。我在文本框中输入了一个字符串,然后单击“发送”。我可以发送任意数量的消息。假设我发送了 3 条消息,而我的 3 条消息是“hi”、“lol”、“bye”。还有一个“接收”按钮。现在,使用TutorialsPoint 中的传统代码,当我单击接收按钮时,我会在控制台上一次打印所有 3 条消息(嗨、哈哈、再见)。但是,我只想在单击 UI 上的“接收”时一次打印一条消息。例如,我第一次点击接收按钮时,它会打印“hi”,第二次会打印“lol”,第三次会打印“bye”。我是 Kafka 的新手,对如何做到这一点感到困惑。我尝试从代码中删除两个循环,所以它只有

ConsumerRecords<String, String> records = consumer.poll(100);
System.out.printf(records.iterator().next().value());

如果我只有这两行代码,第一次点击接收按钮时,它会打印“hi”,但第二次按下它时,会收到消息“尝试心跳失败,因为组正在重新平衡 kafka。 "当我设置 max.poll.records = 1 时,我也遇到了错误,因为我希望所有消息最终,但是当按下接收按钮时,只需要将其中一个消息记录到控制台。下一次,将记录主题中未记录的下一条消息。

希望这是有道理的! 感谢任何帮助! 提前致谢!

编辑: 包含队列后的新代码,因此我们可以在发送和接收消息之间交替并在有新消息时更新队列:

        if (payloadQueue.isEmpty()){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
            if (records.isEmpty()) {
                log.info("No More Records to Add");
                consumer.close();
            }
            else {
                records.iterator().forEachRemaining(record -> {
                    log.info("RECORD: " + record);
                    payloadQueue.offer(record);
                });
                payload = payloadQueue.poll().value();
                log.info("Received event from KAFKA on subject {} with payload \"{}\"", subject, payload);
            }
        }
        else {
            payload = payloadQueue.poll().value();
            log.info("Received event from KAFKA on subject {} with payload \"{}\"", subject, payload);
        }

【问题讨论】:

    标签: java apache-kafka kafka-consumer-api


    【解决方案1】:

    Kafka使用batch get来提高性能,真的没必要设置max.poll.records=1。
    通过一些解决方法可以轻松实现您想要的。

    解决方案

    你可以有一个Queue来存储消息,每次按下接收按钮,你从队列中轮询一条消息,如果队列是空的,你调用consumer.poll来填充队列。

    代码

        private Queue<ConsumerRecord<String,String>> queue=new LinkedList<>();
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        public void buttonPressed(){
            if (queue.isEmpty()){
                consumer.poll(100).iterator().forEachRemaining(record->queue.offer(record));
            }else {
                System.out.println(queue.poll());
            }
        }
    

    【讨论】:

    • 非常感谢!!!关于 UI,我唯一忘记提及的是:假设我发送了 3 条消息(hi、lol、bye),然后单击接收两次,然后返回“hi”和“lol”。即使我还没有记录所有消息,我也想发送另一条消息“你好”。接下来的两次我点击接收按钮,我会得到“再见”和“你好”。我已经编辑了上面的帖子,以显示我的 buttonPressed() 方法的新代码。上面的代码是最好的方法吗?看起来很乱。另外,我在两个地方都有“consumer.close()”。如果我在第一个 if 中没有“consumer.close()”,它会给出错误。为什么?
    • 基本上,我想在发送新消息时动态更新队列,而无需触及 sendButton() 方法的代码。
    • 1.这个方法不需要关闭消费者,只需要在应用关闭的时候关闭消费者。 2.不必每次都调用consumer.poll,只需要在队列为空时调用即可。 3. 如果你想在不按下按钮的情况下更新队列,你将有另一个线程定期将消息轮询到队列中。
    • 谢谢!我现在了解心跳错误。最后一个问题(希望如此!):如果队列为空并且我进行了轮询但没有更多消息,我再次收到 group is rebalancing 错误。我明白为什么因为集合是空的。我该如何处理?我有一个“records.isEmpty()”if 语句,但它仍然会在记录“不再添加记录”之前和之后记录“组正在重新平衡错误”。我在帖子中有更新的代码。再次感谢!
    • 我终于收到了这条消息:会员consumer-kafka-consumer-group-1-645c24e8-05b6-43c4-a45a-dedbbdc0f105 向协调器localhost:9092发送LeaveGroup请求(id: 2147482646 rack: null)由于消费者轮询超时已过期。这意味着后续调用 poll() 之间的时间比配置的 max.poll.interval.ms 长,这通常意味着轮询循环花费了太多时间来处理消息。您可以通过增加 max.poll.interval.ms 或通过使用 max.poll.records 减少 poll() 返回的批次的最大大小来解决此问题。
    【解决方案2】:

    您应该按照@haoyu 的建议添加一个队列,但手动提交消耗的偏移量。否则,应用程序重置可能会导致数据丢失(因为消息已从主题中消费,尽管没有打印到 UI 中)。

    推荐阅读KafkaConsumerjavadoc的“手动偏移控制”和“在Kafka之外存储偏移”一节。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-08-21
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-02-22
      • 2023-01-27
      • 1970-01-01
      相关资源
      最近更新 更多