【问题标题】:Running kafka consumer(new Consumer API) forever永远运行 kafka 消费者(新的消费者 API)
【发布时间】:2016-10-31 22:22:14
【问题描述】:

我在Apache Kafka 上建立了一个排队系统。该应用程序将向特定的Kafka topic 生成消息,并且在消费者端,我必须使用该主题生成的所有记录。
我使用新的 Java Consumer Api 编写了消费者。 代码看起来像

  Properties props = new Properties();  
                     props.put("bootstrap.servers", kafkaBrokerIp+":9092");  
                     props.put("group.id",groupId);  
                     props.put("enable.auto.commit", "true");
                     props.put("session.timeout.ms", "30000");
                     props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
                     consumer.subscribe(Arrays.asList("consumertest"));  
                     while (true) {  
                         ConsumerRecords<String, String> records = consumer.poll(100);  
                         for (ConsumerRecord<String, String> record : records){  
                             System.out.println("Data recieved : "+record.value());  
                             }  
                     }

这里我需要永远运行消费者,以便生产者推送到 kafka 主题的任何记录都应该立即消费和处理。
所以我的困惑是,使用无限while循环(如示例代码中)来消耗数据是否正确?

【问题讨论】:

    标签: java apache-kafka kafka-consumer-api


    【解决方案1】:

    虽然可以无限循环,但可以在 Kafka 消费者 documentation 中找到一种稍微优雅的方法,如下所示:

    public class KafkaConsumerRunner implements Runnable {
        private final AtomicBoolean closed = new AtomicBoolean(false);
        private final KafkaConsumer consumer;
    
        public void run() {
            try {
                consumer.subscribe(Arrays.asList("topic"));
                while (!closed.get()) {
                    ConsumerRecords records = consumer.poll(10000);
                    // Handle new records
                }
            } catch (WakeupException e) {
                // Ignore exception if closing
                if (!closed.get()) throw e;
            } finally {
               consumer.close();
            }
        }
    
        // Shutdown hook which can be called from a separate thread
        public void shutdown() {
            closed.set(true);
            consumer.wakeup();
        }
    }
    

    这使您可以选择使用钩子正常关闭。

    【讨论】:

      【解决方案2】:

      是的,您可以使用无限循环。实际上,这不是一个繁忙的循环。在每次轮询期间,如果数据不可用,呼叫将等待给定的时间段。

      long millisToWait = 100;
      consumer.poll(millisToWait);
      

      新消费者自动处理网络通信问题。确保消费者在关闭时优雅地关闭。

      【讨论】:

        【解决方案3】:

        是的,这是使用无限循环来消耗数据的正确方法。

        消费者通常是长时间运行的应用程序 不断轮询 Kafka 以获取更多数据。消费者必须继续轮询 Kafka,否则他们将被考虑 死了,他们正在消耗的分区将被交给另一个 组中的消费者继续消费。

        poll() 返回记录列表。每条记录都包含主题和分区 记录来自,分区内记录的偏移量,以及 键和记录的值。记录的处理是特定于应用程序的。

        如果您退出循环,请始终在退出之前关闭()消费者。这将关闭网络连接和套接字,还会立即触发重新平衡。

        【讨论】:

          【解决方案4】:

          它对我有用,但您可能希望将内部循环放在 try/catch 块中,以防抛出任何异常。如果您断开连接,还可以考虑定期重新连接任务。

          【讨论】:

            猜你喜欢
            • 1970-01-01
            • 2017-11-16
            • 2021-02-08
            • 1970-01-01
            • 2020-09-19
            • 1970-01-01
            • 2018-07-11
            • 1970-01-01
            • 2017-09-23
            相关资源
            最近更新 更多