【问题标题】:KafkaStreams - InconsistentGroupProtocolExceptionKafkaStreams - InconsistentGroupProtocolException
【发布时间】:2017-05-26 21:17:20
【问题描述】:

我有一个 Kafka Streams 应用程序,它使用 Kafka Streams DSL 连接到我们的 Kafka 集群,如下所示:

KStreamBuilder builder = new KStreamBuilder();
KStream<String, byte[]> stream = builder.stream(myTopic);

// do work

kStreams = new KafkaStreams(builder, config);
kStreams.start();

我的代码库的另一部分直接使用消费者客户端与我们的集群建立连接。

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config, keyDeserializer, valueDeserializer);
consumer.subscribe(Collections.singletonList(sourceTopic));
consumer.poll(500L);
// etc
consumer.close();

我这样做的原因是在有条件地启动应用程序的其他部分(包括 Kafka Streams 拓扑)之前收集有关消费者组的元数据。可能还有其他方法可以做到这一点(例如通过各种钩子或其他方法),但我更好奇为什么这些方法的混合有时会(间歇性地)导致InconsistentGroupProtocolException 被抛出。

有人可以解释一下为什么会抛出这个吗?我很难从源代码本身确定到底发生了什么,但我猜 Kafka Streams 构建的底层消费者指定的分区协议与KafkaConsumer 客户端不同。无论如何,任何有助于理解此异常的帮助将不胜感激

【问题讨论】:

  • 你想完成什么?
  • issues.apache.org/jira/browse/KAFKA-4113 参见 2017 年 1 月 3 日的评论。我遇到了这个问题,并认为这是最简单的解决方案
  • 我明白了。如果您可以确保消费者或应用程序处于活动状态,我想它可能会起作用。因此,在您开始一个或另一个之前,请确保消费者组中没有任何成员——它,该组是不活动的。参考文献github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/…(请注意,此调用不是公共 API 的一部分,可以随时更改,恕不另行通知。)

标签: apache-kafka apache-kafka-streams


【解决方案1】:

您自己提出答案。 Kafka Streams 使用自定义分区分配器,Kafka Streams 客户端仅适用于其他 Kafka Streams 客户端。如果您使用与您的 Kafka Streams 应用程序具有相同组 ID 的 KafkaConsumer,它将无法隔离 KafkaConsumers 以加入 Kafka Streams 消费者组。显然,KafkaConsumer 不能“玩”Kafka Streams。

【讨论】:

  • 这只涉及积极运行的消费者吗?我可以关闭所有“普通”消费者,然后使用相同的组 id 启动流应用程序吗?否则迁移到流是相当麻烦的,尤其是在不丢失数据(偏移)的情况下。
  • AFAIK,它只影响活动组。应该可以从普通的消费者应用程序切换到 Kafka Streams 应用程序,将group.id 重用为application.id。您将无法进行滚动升级,但如果您关闭消费者应用程序,应该可以将其作为 Kafka Streams 应用程序重新启动。
猜你喜欢
  • 2017-11-08
  • 2018-05-19
  • 2020-06-28
  • 2019-04-18
  • 2018-07-13
  • 2022-09-28
  • 2018-06-18
  • 2020-03-18
  • 1970-01-01
相关资源
最近更新 更多