【发布时间】:2016-12-09 20:10:19
【问题描述】:
根据kafka javadocs 上的文档,如果我:
- 订阅模式
- 创建与模式匹配的主题
应该发生重新平衡,这使得消费者从那个新主题中读取。但这并没有发生。
如果我停止并启动消费者,它确实会选择新主题。所以我知道新主题符合模式。 https://stackoverflow.com/questions/37120537/whitelist-filter-in-kafka-doesnt-pick-up-new-topics 中可能存在此问题的重复项,但该问题无处可寻。
我看到了 kafka 日志并且没有错误,它只是不会触发重新平衡。重新平衡会在消费者加入或死亡时触发,但不会在创建新主题时触发(即使将分区添加到现有主题时也不会,但这是另一个主题)。
我使用的是 kafka 0.10.0.0,以及“New Consumer API”的官方 Java 客户端,即代理 GroupCoordinator 而不是胖客户端 + zookeeper。
这是示例消费者的代码:
public class SampleConsumer {
public static void main(String[] args) throws IOException {
KafkaConsumer<String, String> consumer;
try (InputStream props = Resources.getResource("consumer.props").openStream()) {
Properties properties = new Properties();
properties.load(props);
properties.setProperty("group.id", "my-group");
System.out.println(properties.get("group.id"));
consumer = new KafkaConsumer<>(properties);
}
Pattern pattern = Pattern.compile("mytopic.+");
consumer.subscribe(pattern, new SampleRebalanceListener());
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("%s %s\n", record.topic(), record.value());
}
}
}
}
在生产者中,我正在向名为 mytopic1、mytopic2 等的主题发送消息。
如果不触发再平衡,模式几乎毫无用处。
你知道为什么没有发生再平衡吗?
【问题讨论】:
-
您等了多久才看到新主题是否开始被消费?组协调器代理异步检查新主题是否与模式匹配,并且它会定期进行。可能你没有等待足够长的时间。
-
@alexlod 您的评论方向正确。我找到了哪个属性控制检查的时间段并将其添加到答案中。
标签: java apache-kafka kafka-consumer-api