【问题标题】:How can I list the Kafka consumer lag and latest offset per partition and consumer group using the Kafka .NET client API?如何使用 Kafka .NET 客户端 API 列出每个分区和消费者组的 Kafka 消费者滞后和最新偏移量?
【发布时间】:2022-02-12 22:25:13
【问题描述】:

我正在尝试使用 .NET Confluent.Kafka 1.4.0-RC1(适用于 Net472)来消除消费者滞后。 我可以使用这个脚本得到想要的结果:

$ bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --bootstrap-server localhost:9092 --group Grp1 --describe

结果:

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                  HOST            CLIENT-ID
Grp1            test3           1          15              15              0               rdkafka-ca76855f-7b66-4bf1-82bc-73e9a1c1cf71 /10.186.129.93  rdkafka
Grp1            test3           2          13              13              0               rdkafka-d64379dc-881a-4f6f-a793-51e832cc2f5a /10.186.129.93  rdkafka
Grp1            test3           0          9               9               0               rdkafka-a25bdb80-3b70-4e42-963e-d41ad9e2a99a /10.186.129.93  rdkafka
Grp1            test            0          68              68              0               -                                            -               -

我无法使用 .NET 客户端代码获得类似的报告。这是我一直在尝试的代码 - 但由于 consumer.Assignment 属性有一个空集合,因此一无所获。

private string WriteConsumerGroupLags(string bootstrapServers, string consumerGroupName) {
    // kafka-console-consumer.bat --zookeeper MW45670117:2380 --topic powertelemetry --consumer-property group.id=test123 --consumer-property enable.auto.commit=true

    StringBuilder sb = new StringBuilder();
    sb.AppendLine("\n");
    sb.AppendLine("Consumer Group Lag Report");
    sb.AppendLine("-------------------------");
    ConsumerConfig config = new ConsumerConfig {
        BootstrapServers = bootstrapServers,
        GroupId = consumerGroupName,
        AutoOffsetReset = AutoOffsetReset.Earliest
    };

    using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build()) {
        foreach (TopicPartition tp in consumer.Assignment) {
            string topic = tp.Topic;
            int partitionID = tp.Partition.Value;
            // gets the current position (offset) for the specific topic/partition
            Offset offset = consumer.Position(new TopicPartition(topic, new Partition(partitionID)));
            sb.AppendLine($"Offset value is: {offset.Value}");

            // returns current commited offset for the current assignment
            List<TopicPartitionOffset> tpos = consumer.Committed(TimeSpan.FromSeconds(4));
            foreach (TopicPartitionOffset tpo in tpos) {
                sb.AppendLine($"Commited offset for partition {tpo.TopicPartition} is {tpo.Offset}");
            }
        }
    }

    return sb.ToString();
}

查找每个分区/消费者组的消费者滞后和最新偏移量。

【问题讨论】:

  • 我建议设置外部工具,以监控所有语言的所有消费者的延迟

标签: .net apache-kafka confluent-platform


【解决方案1】:

好的,我终于找到了方法。看起来您需要将主题分区分配设置给消费者组。就我而言,主题是test3,我有 3 个分区。这必须在代码中设置。我想更通用的实现是将与该组关联的 List 传递给此方法。 以下是我获得消费者滞后的方法:

private string WriteConsumerGroupLags(string bootstrapServers, string consumerGroupName) {
    StringBuilder sb = new StringBuilder();
    sb.AppendLine("\n");
    sb.AppendLine("Consumer Group Lag Report");
    sb.AppendLine("-------------------------");
    ConsumerConfig config = new ConsumerConfig {
        BootstrapServers = bootstrapServers,
        GroupId = consumerGroupName,
        AutoOffsetReset = AutoOffsetReset.Earliest
    };

    using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build()) {
        List<TopicPartition> topic_partitions = new List<TopicPartition>() {
            { new TopicPartition("test3", new Partition(0)) },
            { new TopicPartition("test3", new Partition(1) ) },
            { new TopicPartition("test3", new Partition(2) ) } };

        consumer.Assign(topic_partitions);

        List<TopicPartitionOffset> tpos = consumer.Committed(TimeSpan.FromSeconds(40));
        foreach (TopicPartitionOffset tpo in tpos) {
            WatermarkOffsets w = consumer.QueryWatermarkOffsets(tpo.TopicPartition, TimeSpan.FromSeconds(40));
            long commited = tpo.Offset.Value;
            long log_end_offset = w.High.Value;
            long lag = log_end_offset - commited;
            sb.AppendLine($"Commited offset for Topic {tpo.TopicPartition.Topic} Partition {tpo.TopicPartition.Partition} is {commited} out of Wantermark end offset {log_end_offset} Lag is: {lag}");
        }

        sb.AppendLine();
    }

    return sb.ToString();
}

【讨论】:

  • 您找到如何将主题映射到消费者组以生成topic_partitions了吗?
【解决方案2】:

我已扩展您的答案,包括为主题分区生成模型。

public class Offsets
{
   public string WriteConsumerGroupLags(string bootstrapServers, string consumerGroupName)
   {
      var sb = new StringBuilder();
      sb.AppendLine("\n");
      sb.AppendLine("Consumer Group Lag Report");
      sb.AppendLine("-------------------------");

      var config = new ConsumerConfig
      {
         BootstrapServers = bootstrapServers,
         GroupId = consumerGroupName,
         SecurityProtocol = SecurityProtocol.Ssl
      };

      using var adminClient = new AdminClientBuilder(config).Build();
      using var consumer = new ConsumerBuilder<Ignore, string>(config).Build();


      var meta = adminClient.GetMetadata(TimeSpan.FromSeconds(20));

      var topicPartitions = new List<TopicPartition>();

      foreach (var topicMeta in meta.Topics.Where(x => !x.Topic.Contains("__")))
      {
         topicPartitions.AddRange(topicMeta.Partitions.Select(topicMetaPartition => new TopicPartition(topicMeta.Topic, new Partition(topicMetaPartition.PartitionId))));
      }

      consumer.Assign(topicPartitions);

      List<TopicPartitionOffset> tpos = consumer.Committed(TimeSpan.FromSeconds(40));
      foreach (TopicPartitionOffset tpo in tpos)
      {
         WatermarkOffsets w = consumer.QueryWatermarkOffsets(tpo.TopicPartition, TimeSpan.FromSeconds(40));
         long commited = tpo.Offset.Value;
         long log_end_offset = w.High.Value;
         long lag = log_end_offset - commited;
         sb.AppendLine(
            $"Commited offset for Topic {tpo.TopicPartition.Topic} Partition {tpo.TopicPartition.Partition} is {commited} out of Wantermark end offset {log_end_offset} Lag is: {lag}");
      }

      sb.AppendLine();

      return sb.ToString();
   }
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2019-05-01
    • 2020-06-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-10-31
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多