【问题标题】:Kafka command-line consumer reads, but cannot read through JavaKafka命令行消费者读取,但无法通过Java读取
【发布时间】:2019-05-14 09:46:08
【问题描述】:

我已使用此命令手动创建主题test

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

并使用此命令:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

我插入了这些记录:

This is a message
This is another message
This is a message2

首先,我通过这样的命令行使用消息:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

并且所有记录都成功显示。然后,我尝试使用以下代码在 Java 中实现消费者:

public class KafkaSubscriber {

    public void consume() {

        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("test"));
        // also with this command
        // consumer.subscribe(Arrays.asList("test"));

        System.out.println("Starting to read data...");

        try {
            while (true) {
                try {
                    ConsumerRecords<String, String> records = consumer.poll(100);
                    System.out.println("Number of records found: " + records.count());
                    for (ConsumerRecord rec : records) {
                        System.out.println(rec.value());
                    }
                }
                catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
        catch (Exception e) {
                e.printStackTrace();
        } finally {
            consumer.close();
        }
}

但是输出是:

Starting to read data...
0
0
0
0
0
....

这意味着它在主题test 中没有找到任何记录。我还尝试在 Java 使用者启动之后发布一些记录,但同样如此。有什么想法可能出了什么问题?


编辑:添加以下行后:

 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

消费者现在只在我向主题写入新记录时读取。它不会从一开始就读取所有记录。

【问题讨论】:

  • 能否请您在属性中添加ConsumerConfig.AUTO_OFFSET_RESET_CONFIGearliest 然后发布结果?
  • @Bitswazsky 我用结果编辑了我的问题。它现在在发布者写入新内容时读取,但当消费者开始时,它不会读取主题中已有的内容。

标签: java apache-kafka kafka-consumer-api


【解决方案1】:

默认情况下,如果之前没有为组提交偏移量,则消费者从结束主题开始。

因此,如果您在生成记录后运行它,它将不会接收它们。

在您的kafka-console-consumer.sh 中注意,您有--from-beginning 标志,它强制消费者从主题的开头开始。

正如评论中所建议的,一种解决方法是将ConsumerConfig.AUTO_OFFSET_RESET_CONFIG 设置为earliest。但是,我会小心使用该设置,因为您的消费者将从主题开始就开始消费,这在实际用例中可能是大量数据。

最简单的解决方案是现在您已经运行了一次消费者并且它已经创建了一个组,您可以简单地重新运行生产者。之后,当您再次运行消费者时,它将从位于新生产者消息之前的最后一个位置开始。

另一方面,如果您的意思是始终重新使用所有消息,那么您有两种选择:

  • 当您的消费者开始将其位置移动到主题的开头时,显式使用seekToBeginning()

  • auto.offset.reset 设置为earliest 并通过将enable.auto.commit 设置为false 来禁用自动偏移提交

【讨论】:

  • 感谢您的回答。我想要的是当消费者开始阅读主题中已经写的所有内容时。我将ConsumerConfig.AUTO_OFFSET_RESET_CONFIG 设置为earliest,但现在消费者只有在发布者写入新内容后才会阅读。
  • 在这种情况下,您有 2 个选项:1)当您的消费者开始将其位置移动到主题的开头时,明确使用 seekToBeginning() 2)将 auto.offset.reset 设置为最早并禁用偏移提交
猜你喜欢
  • 1970-01-01
  • 2022-01-20
  • 1970-01-01
  • 2018-01-17
  • 2018-07-27
  • 1970-01-01
  • 2020-09-22
  • 1970-01-01
  • 2021-04-26
相关资源
最近更新 更多