【问题标题】:Kafka Consumer poll and reconnectionKafka消费者轮询和重新连接
【发布时间】:2016-07-26 18:10:01
【问题描述】:

我们刚刚开始在我们的项目中使用 Kafka。我们正在使用 kafka_2.11-0.9.0.0。我有一些与 KafkaConsumer 相关的问题。

1) 我在启动 Zookeeper 和 Kafka 服务器之前启动了 Kafka Consumer,但我的 KafkaConsumer 客户端仍然能够连接。我有以下代码行

    Consumer<String, String> consumer =  new KafkaConsumer<String,String>(props);
    consumer.subscribe(getConsumerRegisteredTopics());
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
        for (ConsumerRecord<String, String> record : records){
           processRecord (record)
        }
   }  

2) 我读到,Zookeeper 通过使用 poll(long timeout) 方法调用来跟踪活动的消费者。如果我使用 Long.MAX_VALUE 在 poll() 中超时,zookeeper 将如何跟踪我的消费者。能否请您帮助我了解 KafkaConsumer 民意调查调用的行为。

提前致谢。

【问题讨论】:

    标签: apache-kafka long-polling kafka-consumer-api


    【解决方案1】:

    1) 如果您在启动消费者之前没有启动 zookeeper 和 kafka,它无法连接,但会尝试从 kafka 读取元数据。我的经验是 KafkaConsumer 'poll' 调用会被 undefinetly 阻塞,直到它能够连接和读取元数据。换句话说……您的消费者实际上并没有连接,而是在等待 kafka 集群出现。

    2) 轮询超时告诉消费者等待多长时间才能返回任何数据。您必须确保在 poll 返回后您再次调用 poll 足以让您的消费者保持活跃。给轮询调用的超时与 KafkaConsumer 的 keepalive 机制无关(这由您的消费者的消费者属性的 session.timeout.ms 属性控制)。

    【讨论】:

    • 感谢您的回复。如果我的消费者在轮询方法中等待 Long.MAX_VALUE,它将如何发送心跳以及 Kafka 服务器/Zookeeper 将如何知道我的消费者还活着。
    • 心跳是为了确保您的应用程序代码是活动的。所以只要你的程序中的控制流在 poll 方法中,你就不必担心心跳。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-03
    • 1970-01-01
    • 2019-03-14
    • 1970-01-01
    • 2018-09-29
    • 2017-06-19
    相关资源
    最近更新 更多