【问题标题】:Kafka pattern subscription. Rebalancing is not being triggered on new topicKafka 模式订阅。新主题不会触发再平衡
【发布时间】:2016-12-09 20:10:19
【问题描述】:

根据kafka javadocs 上的文档,如果我:

  • 订阅模式
  • 创建与模式匹配的主题

应该发生重新平衡,这使得消费者从那个新主题中读取。但这并没有发生。

如果我停止并启动消费者,它确实会选择新主题。所以我知道新主题符合模式。 https://stackoverflow.com/questions/37120537/whitelist-filter-in-kafka-doesnt-pick-up-new-topics 中可能存在此问题的重复项,但该问题无处可寻。

我看到了 kafka 日志并且没有错误,它只是不会触发重新平衡。重新平衡会在消费者加入或死亡时触发,但不会在创建新主题时触发(即使将分区添加到现有主题时也不会,但这是另一个主题)。

我使用的是 kafka 0.10.0.0,以及“New Consumer API”的官方 Java 客户端,即代理 GroupCoordinator 而不是胖客户端 + zookeeper。

这是示例消费者的代码:

public class SampleConsumer {
public static void main(String[] args) throws IOException {
    KafkaConsumer<String, String> consumer;
    try (InputStream props = Resources.getResource("consumer.props").openStream()) {
        Properties properties = new Properties();
        properties.load(props);
        properties.setProperty("group.id", "my-group");

        System.out.println(properties.get("group.id"));
        consumer = new KafkaConsumer<>(properties);
    }
    Pattern pattern = Pattern.compile("mytopic.+");
    consumer.subscribe(pattern, new SampleRebalanceListener());
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(1000);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("%s %s\n", record.topic(), record.value());
        }
    }
}

}

在生产者中,我正在向名为 mytopic1、mytopic2 等的主题发送消息。

如果不触发再平衡,模式几乎毫无用处。

你知道为什么没有发生再平衡吗?

【问题讨论】:

  • 您等了多久才看到新主题是否开始被消费?组协调器代理异步检查新主题是否与模式匹配,并且它会定期进行。可能你没有等待足够长的时间。
  • @alexlod 您的评论方向正确。我找到了哪个属性控制检查的时间段并将其添加到答案中。

标签: java apache-kafka kafka-consumer-api


【解决方案1】:

文档提到“模式匹配将定期针对检查时存在的主题进行。”。事实证明,“定期”对应于 metadata.max.age.ms 属性。通过将该属性(在我的代码示例中的“consumer.props”中)设置为 5000,我可以看到它每 5 秒检测一次新主题和分区。

这是设计好的,根据这张jira票https://issues.apache.org/jira/browse/KAFKA-3854

关于 JIRA 的最后一条注释指出,稍后创建的与消费者订阅模式匹配的主题不会在创建时分配给消费者,这似乎是设计的。需要对相同模式重复 subscribe() 来处理这种情况。

刷新元数据轮询执行票证中提到的“重复订阅()”。

这令人困惑,因为 Kafka 0.8 是基于 zookeper 手表而不是轮询的真正触发。对于这种情况,IMO 0.9 更多的是降级,而不是“及时”的重新平衡,这变成了具有开销的高频轮询,或者在对新主题/分区做出反应之前长时间的低频轮询。

【讨论】:

    【解决方案2】:

    要立即触发重新平衡,您可以在订阅主题后显式进行轮询:

    kafkaConsumer.poll(pollDuration);
    

    参考: https://cwiki.apache.org/confluence/display/KAFKA/KIP-568%3A+Explicit+rebalance+triggering+on+the+Consumer

    【讨论】:

      【解决方案3】:

      在您的消费者代码中,使用以下代码:

      properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, EARLIEST)
      

      再试一次

      【讨论】:

        猜你喜欢
        • 2020-12-31
        • 2019-10-24
        • 1970-01-01
        • 2019-09-18
        • 2017-07-14
        • 2018-11-03
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多