【发布时间】:2017-05-26 21:17:20
【问题描述】:
我有一个 Kafka Streams 应用程序,它使用 Kafka Streams DSL 连接到我们的 Kafka 集群,如下所示:
KStreamBuilder builder = new KStreamBuilder();
KStream<String, byte[]> stream = builder.stream(myTopic);
// do work
kStreams = new KafkaStreams(builder, config);
kStreams.start();
我的代码库的另一部分直接使用消费者客户端与我们的集群建立连接。
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config, keyDeserializer, valueDeserializer);
consumer.subscribe(Collections.singletonList(sourceTopic));
consumer.poll(500L);
// etc
consumer.close();
我这样做的原因是在有条件地启动应用程序的其他部分(包括 Kafka Streams 拓扑)之前收集有关消费者组的元数据。可能还有其他方法可以做到这一点(例如通过各种钩子或其他方法),但我更好奇为什么这些方法的混合有时会(间歇性地)导致InconsistentGroupProtocolException 被抛出。
有人可以解释一下为什么会抛出这个吗?我很难从源代码本身确定到底发生了什么,但我猜 Kafka Streams 构建的底层消费者指定的分区协议与KafkaConsumer 客户端不同。无论如何,任何有助于理解此异常的帮助将不胜感激
【问题讨论】:
-
你想完成什么?
-
issues.apache.org/jira/browse/KAFKA-4113 参见 2017 年 1 月 3 日的评论。我遇到了这个问题,并认为这是最简单的解决方案
-
我明白了。如果您可以确保消费者或应用程序处于活动状态,我想它可能会起作用。因此,在您开始一个或另一个之前,请确保消费者组中没有任何成员——它,该组是不活动的。参考文献github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/…(请注意,此调用不是公共 API 的一部分,可以随时更改,恕不另行通知。)
标签: apache-kafka apache-kafka-streams