【问题标题】:Kafka: Monitor the lag for the consumers that are assigned to partitions topicKafka:监控分配给分区主题的消费者的滞后
【发布时间】:2021-01-31 02:58:44
【问题描述】:

我正在使用 Kafka 0.9.1 新的消费者 API。消费者被手动分配到一个分区。对于这个消费者,我希望看到它的进步(意味着滞后)。由于我添加了 group id consumer-tutorial 作为属性,我假设我可以使用命令

bin/kafka-consumer-groups.sh --new-consumer --describe --group consumer-tutorial --bootstrap-server localhost:9092

(这里解释为http://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0.9-consumer-client

很遗憾,我的消费者组详细信息未使用上述命令显示。因此我无法监控我的消费者的进度(这是滞后的)。如何监控上述场景(手动分配分区)中的延迟?

代码是:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);


        String topic = "my-topic";
        TopicPartition topicPartition = new TopicPartition(topic, 0);
        consumer.assign(Arrays.asList(topicPartition));
        consumer.seekToBeginning(topicPartition);
try {
  while (true) {
    ConsumerRecords<String, String> records = consumer.poll(1000);
    for (ConsumerRecord<String, String> record : records)
     System.out.println(record.offset() + ": " + record.value());
  consumer.commitSynch();
  }
} finally {
  consumer.close();
}

【问题讨论】:

    标签: apache-kafka kafka-consumer-api


    【解决方案1】:

    以防万一您不想编写代码来获取此信息或临时运行类似命令的工具/shell 脚本,有 N 个工具可以捕获 Kafka 指标,包括 Consumer Lag。在我脑海中浮现:BurrowSPM for Kafka 干得好。这里有一些关于 Kafka 偏移量、消费者延迟和一些从 Kafka 通过 JMX 公开的内容派生的指标的背景知识。 HTH。

    【讨论】:

    • 背景资料在哪里?我看不到链接。
    【解决方案2】:

    如果你对 JMX 暴露消费群体滞后感兴趣,这里是我写的代理: https://github.com/peterkovgan/kafka9.offsets

    您可以在某个 Kafka 节点上运行此代理并将偏移滞后统计信息公开给外部阅读器。

    有一些示例如何将此代理与 Telegraf 一起使用 (https://influxdata.com/time-series-platform/telegraf/)。

    最后(结合例如 telegraf、influxdb 和 grafana)您可以看到几个消费者群体的偏移滞后的漂亮图表。

    【讨论】:

      【解决方案3】:

      kafka-consumer-groups.sh 命令中,您的组名不正确--group consumer-tutorial 不是consumer-tutorial-group

      【讨论】:

      • 嗨!谢谢你的观察。即使我更改了组名,我也无法使用上述命令。我会在帖子中修改。
      【解决方案4】:

      您的代码问题与手动将消费者分配到主题分区直接相关。

      您在group.id 属性中指定了一个消费者组,但是,组ID 仅在您通过KafkaConsumer.subscribe() API 订阅一个主题(或一组主题)时使用。在您的示例中,您使用的是 .assign() 方法,该方法手动将客户端附加到指定的主题-分区对,而不使用底层消费者组原语。正是由于这个原因,您无法看到消费者滞后。 Burrow 之类的工具在这种情况下将不起作用,因为它们会查询消费者组的偏移量,而消费者组并不存在。

      您有两种选择:

      1. 使用subscribe() API 正确使用消费者组功能。这是 Kafka 的主要用例。但是,seekToBeginning() 在这种情况下也不起作用,因为偏移量将完全由消费者组管理。
      2. 完全删除消费者组并手动管理分区分配和偏移量。这为您提供了最大可能的灵活性,但工作量很大,您可能会发现自己在重新发明轮子。大多数人不会走这条路,除非 Kafka 的消费组功能不适合你的需求。

      选择将完全取决于您的用例。对于传统的流处理,#1 是惯用的方法。这就是卡夫卡的设计目的。 #2 意味着您知道自己在做什么,并将所有组管理责任转移到您的应用程序上。

      注意:Kafka 没有“部分”模式,您可以在其中进行一些组管理,而 Kafka 负责其余的工作。要么全押,要么根本没有。

      【讨论】:

        【解决方案5】:

        您可以使用简单而强大的工具进行延迟监控

        prometheus-kafka-consumer-group-exporter

        参考以下网址:

        https://github.com/braedon/prometheus-kafka-consumer-group-exporter

        安装后运行以下命令以在所需端口 Prometheus Kafka Consumer Group Exporter 上导出消费者矩阵

        /usr/bin/python3 /usr/local/bin/prometheus-kafka-consumer-group-exporter -p PORT -b KAFKA_CLUSTER_IP_PORT

        运行上述命令后,验证 http url YOUR-SERVER-IP:PORT 上的数据,例如 127.0.0.1:9208

        现在您可以将任何 JMX 刮板用于仪表板和警报系统。我正在使用 prometheus 和 grafana

        这可以在任何共享服务器上运行,例如 [kafka broker、zookeeper 服务器、prometheus 服务器或任何],因为它对系统资源的开销非常低。

        【讨论】:

          猜你喜欢
          • 2018-06-26
          • 1970-01-01
          • 2022-06-14
          • 2017-10-17
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2018-12-28
          • 1970-01-01
          相关资源
          最近更新 更多