【问题标题】:Find partition(s) assigned to Kafka stream instance查找分配给 Kafka 流实例的分区
【发布时间】:2020-11-30 20:38:45
【问题描述】:

我有一个订阅许多主题的 Kafka 流应用程序,每个主题都有许多分区。 当我创建应用程序拓扑并启动它时,我是否知道哪些主题的哪些分区分配给了我的应用程序的当前实例?我想知道这个独立于任何记录是否已被此实例处理。

我知道当我得到一条记录时,我可以通过processorContext.partition()processorContext.topic() 来获取当前正在处理的记录的分区/主题信息。但我不是在寻找那个。

我正在寻找与 kafka 流端的 KafkaConsumer.assigment 等效的东西。

我也尝试了以下代码,但我得到 s 的大小为 0。

<Prepare builder and sconfig>
kafkaStream = new KafkaStreams (builder, sconfig);
kafkaStream.start ();
Collection<StreamsMetadata> s = kafkaStream.allMetadata();
System.out.println("StreamsMetadata: size is " + s.size());
for (StreamsMetadata m : s) {
    Set<TopicPartition> tp = m.topicPartitions();
    System.out.println  ("TopicPartition: " + tp.toString());
}

【问题讨论】:

  • 用例问题:为什么您的应用中需要这些信息?
  • 我想从我的应用程序的许多实例中选出一个领导者实例。分配给它的主题的分区 0 的实例将充当领导者并执行一些特殊任务。如果可能的话,想避免使用动物园管理员/其他基础设施。

标签: apache-kafka apache-kafka-streams


【解决方案1】:

更新答案(2020 年 11 月):

当我创建应用拓扑并启动它时,我是否知道哪些主题的哪些分区分配给了我的应用的当前实例?

如果我理解正确,您可以按如下方式执行此操作。在您的应用程序实例中,使用KafkaStreams#localThreadsMetadata() 获取所有本地流线程(该应用程序实例的)的ThreadMetadataThreadMetadata 包含 TaskMetadata 用于该流线程上的所有活动和备用任务。 TaskMetadata 有一个方法 topicPartitions() 来获取输入主题分区。

旧的、过时的答案:据我所知,Kafka Streams 中没有现有的 API 可以公开此信息。可以从 Kafka 消费者(由 Kafka Streams 使用)获取此信息,但不会在 Kafka Streams 中公开。

【讨论】:

猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2021-11-29
  • 2018-10-14
  • 1970-01-01
  • 1970-01-01
  • 2018-06-21
  • 1970-01-01
  • 2018-10-08
相关资源
最近更新 更多