【问题标题】:Kafka Consumer stops consuming messages on restarting KafkaKafka Consumer 在重启 Kafka 时停止消费消息
【发布时间】:2019-12-06 20:26:20
【问题描述】:

我有一个包含 3 个节点的 Zookeeper (version-3.4.13) 集群和一个包含 2 个节点的 Kafka (version-2.11-2.1.0) 集群.我正在使用 Spring Kafka version-2.1.9.RELEASE 来设置生产者/消费者。

我正在运行我的 Spring Boot 应用程序的 2 节点集群,具有特定主题的消费者。如果我停止两个 Kafka 节点,然后只启动其中一个(可能不充当“组协调器”),那么消费者会停止消费队列中的消息(即使在一个 Kafka 节点启动并运行之后),直到我重新启动我的Spring Boot 应用程序。

在这个问题上没有找到太多资源。需要了解造成这种情况的原因,Kafka 集群在哪里出现故障,然后其中一个节点是否启动(也许应该自动选举领导者)。

以下是我的消费者配置:

Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServerUrl + ":" + kafkaServerPort +
            (StringUtils.isEmpty(additionalBrokers)?"":","+additionalBrokers));
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "testGroup");
properties.put(JsonDeserializer.TRUSTED_PACKAGES, "com.test.model");
properties.put(JsonDeserializer.VALUE_DEFAULT_TYPE,"com.test.EMailConfig");

我希望当只有一个 Kafka 集群节点再次启动时,消费者应该再次开始消费消息。

有人遇到过这样的问题吗?请发表您的建议。

以下是 1 个代理停止时 describe 命令输出的屏幕截图: enter image description here

提前致谢。

【问题讨论】:

  • 您是否已将主题配置为复制因子为 2?如果没有,你只能在两个经纪人都起来的情况下消费。 $ kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic myTopic.
  • 感谢您的回复 Gary,是的,我已将主题配置为复制因子为 2。
  • 能否提供客户端日志(TRACE 或 DEBUG 级别)?您是否验证了所有分区领导者在其中 1 个关闭时在剩余的代理上被选举?
  • 消费者日志主要反映(当我关闭一个 Kafka 节点,可能是组协调器/领导者时):DEBUG o.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=emailConsumerGroup-1] 由于没有可用节点,放弃发送元数据请求
  • 另外,当我描述消费者组时,它不显示任何分区的客户端 ID(当两个 Kafka 代理都运行时显示)。此外,当我从命令行描述 Kafka 主题时,它会在 ISR & Leader 列中显示代理的正确 ID。

标签: apache-kafka kafka-consumer-api spring-kafka


【解决方案1】:

该问题不可重现。至少对我来说。这就是我所做的

  1. 使用 docker 镜像启动了 kafka (2.12.24.0):https://codeload.github.com/wurstmeister/kafka-docker/zip/master
  2. 用项目启动了最小的spring-kafka:https://codeload.github.com/wurstmeister/kafka-docker/zip/master

然后我用邮递员发了一条消息;它产生到主题并立即被消耗,然后关闭 kafka 节点,然后启动 kafka 节点。现在我重新发送;它的生产和消费都很好。

【讨论】:

    猜你喜欢
    • 2017-09-23
    • 2018-06-11
    • 1970-01-01
    • 2018-11-28
    • 1970-01-01
    • 1970-01-01
    • 2018-07-30
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多