【发布时间】:2020-11-30 20:38:45
【问题描述】:
我有一个订阅许多主题的 Kafka 流应用程序,每个主题都有许多分区。 当我创建应用程序拓扑并启动它时,我是否知道哪些主题的哪些分区分配给了我的应用程序的当前实例?我想知道这个独立于任何记录是否已被此实例处理。
我知道当我得到一条记录时,我可以通过processorContext.partition() 和processorContext.topic() 来获取当前正在处理的记录的分区/主题信息。但我不是在寻找那个。
我正在寻找与 kafka 流端的 KafkaConsumer.assigment 等效的东西。
我也尝试了以下代码,但我得到 s 的大小为 0。
<Prepare builder and sconfig>
kafkaStream = new KafkaStreams (builder, sconfig);
kafkaStream.start ();
Collection<StreamsMetadata> s = kafkaStream.allMetadata();
System.out.println("StreamsMetadata: size is " + s.size());
for (StreamsMetadata m : s) {
Set<TopicPartition> tp = m.topicPartitions();
System.out.println ("TopicPartition: " + tp.toString());
}
【问题讨论】:
-
用例问题:为什么您的应用中需要这些信息?
-
我想从我的应用程序的许多实例中选出一个领导者实例。分配给它的主题的分区 0 的实例将充当领导者并执行一些特殊任务。如果可能的话,想避免使用动物园管理员/其他基础设施。
标签: apache-kafka apache-kafka-streams