【发布时间】:2020-06-28 19:38:12
【问题描述】:
我编写 KafkaStreams 应用程序并将 maximum.num.threads 设置为 1。我有三个源主题,分别有 6、8、8 个分区。当前使用 4 个实例运行此流拓扑,因此 4 个正在运行的流线程。
我在我的一个 kafka 主题中收到了 INCOMPLETE_SOURCE_TOPIC_METADATA。我从github找到了下面的代码 抛出这个错误并试图理解代码
final Map<String, InternalTopicConfig> repartitionTopicMetadata = new HashMap<>();
for (final InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
for (final String topic : topicsInfo.sourceTopics) {
if (!topicsInfo.repartitionSourceTopics.keySet().contains(topic) &&
!metadata.topics().contains(topic)) {
log.error("Missing source topic {} during assignment. Returning error {}.",
topic, AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.name());
return new GroupAssignment(
errorAssignment(clientMetadataMap, topic,
AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code())
);
}
}
for (final InternalTopicConfig topic : topicsInfo.repartitionSourceTopics.values()) {
repartitionTopicMetadata.put(topic.name(), topic);
}
}
我的问题:
这个错误是因为 Kafka 主题的分区不匹配,还是
TopicsInfo当时不可用(想想 Kafka 组失去了对 Kafka 主题的访问权限)?topicsInfo.repartitionSourceTopics调用是什么意思?
【问题讨论】:
-
你能分享你的拓扑,或者你想从 3 个输入主题中实现什么吗?
标签: apache-kafka kafka-consumer-api apache-kafka-streams apache-kafka-connect