【问题标题】:Number of commits and offset in each partition of a kafka topickafka 主题的每个分区中的提交次数和偏移量
【发布时间】:2015-02-14 10:45:21
【问题描述】:

如何在 known kafka 主题的每个分区中查找提交次数和当前偏移量。我正在使用卡夫卡 v0.8.1.1

【问题讨论】:

    标签: apache-kafka


    【解决方案1】:

    从您的问题中不清楚,您对哪种偏移量感兴趣。 实际上有三种类型的偏移量:

    1. 主题分区中第一条可用消息的偏移量。使用-2 (最早)作为 GetOffsetShell 工具的 --time 参数
    2. 主题分区中最后一条可用消息的偏移量。使用 -1(最新)作为 --time 范围。
    3. 最后读取/处理的消息偏移量由维护 卡夫卡消费者。高级消费者将每个消费者组的信息存储在 一个内部的 Kafka 主题(曾经是 Zookeeper)并负责 在调用 commit() 或自动提交时保持最新 设置为真。对于简单的消费者,您的代码必须采取 关心管理偏移量。

    除了命令行实用程序,#1 和#2 的偏移信息也可以通过 SimpleConsumer.earliestOrLatestOffset() 获得。

    如果消息数不太大,可以给GetOffsetShell指定一个大的--offsets参数,然后统计工具返回的行数。否则,您可以在 scala/java 中编写一个简单的循环,从最早开始迭代所有可用的偏移量。

    From Kafka documentation:

    Get Offset Shell
    get offsets for a topic
    bin/kafka-run-class.sh kafka.tools.GetOffsetShell
    
    required argument [broker-list], [topic]
    Option Description 
    ------ ----------- 
    --broker-list <hostname:port,..., REQUIRED: The list of hostname and hostname:port> port of the server to connect to. 
    --max-wait-ms <Integer: ms> The max amount of time each fetch request waits. (default: 1000) 
    --offsets <Integer: count> number of offsets returned (default: 1)
    --partitions <partition ids> comma separated list of partition ids. If not specified, will find offsets for all partitions (default) 
    --time <Long: timestamp in milliseconds / -1(latest) / -2 (earliest) timestamp; offsets will come before this timestamp, as in getOffsetsBefore  > 
    --topic <topic> REQUIRED: The topic to get offsets from.
    

    【讨论】:

    • 我需要两个功能 1. 监控每个分区的延迟 2. 如果系统完全重新启动(动物园管理员、代理、生产者和消费者),我如何从上次读取/处理的消息偏移中恢复高级消费者
    【解决方案2】:

    关于主题和分区的偏移量,您可以使用kafka.tools.GetOffsetShell。例如使用这些命令(我有主题games):

    bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic games --time -1
    

    我将得到games:0:47841,这意味着对于主题games0 分区,我有最新未使用的偏移量47841(最新可用消息)。

    您可以使用-2 查看第一条可用消息。

    【讨论】:

    • 一个警告:如果打开日志压缩,这将不准确。
    • 有没有办法以编程方式获得相同的值?
    【解决方案3】:

    此信息还有助于创建脚本以查看主题分区上的消息数(从命令行)。虽然像 Kafka-Web-Console 这样的工具很不错,但我们中的一些人生活在一个没有 GUI 的世界中。

    这是脚本...使用和修改它需要您自担风险:-)

    #!/bin/bash
    
    topic=$1
    
    if [[ -z "${topic}" ]] ; then
    
        echo "Usage: ${0} <topic>"
        exit 1
    
    fi
    
    
    if [[ -z "${KAFKA_HOME}" ]] ; then
    
        # $KAFKA_HOME not set, using default /kafka
        KAFKA_HOME="/kafka"
    
    fi
    
    if [ ! -d ${KAFKA_HOME} ] ; then
    
        echo "\$KAFKA_HOME does not point to a valid directory [$KAFKA_HOME]"
        exit 1
    
    fi
    
    cd $KAFKA_HOME
    
    echo
    echo "Topic: ${topic}: "
    
    #
    printf "Partition  Count\n"
    printf "~~~~~~~~~~ ~~~~~~~~~~~~\n"
    
    idx=0
    for msg in `bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic ${topic} --broker-list localhost:9092 --time -1` ; do
    
        name=`echo ${msg} | awk -F ":" '{print $1}'`
        partition=`echo ${msg} | awk -F ":" '{print $2}'`
        total=`echo ${msg} | awk -F ":" '{print $3}'`
    
        printf "%10d %12d\n" ${partition} ${total}
        idx=$((idx + 1))
    
    done
    
    if [ ${idx} -eq 0 ] ; then
    
        echo "Topic name not found!"
        exit 1
    
    fi
    
    echo
    exit ${rc}
    

    【讨论】:

    • 由于日志保留,每个分区的最新偏移量不一定等于每个分区中当前有多少消息。如果未打开日志压缩,您可以通过从最新的偏移量中减去最早的偏移量来实现该计数。
    【解决方案4】:

    从 0.9.0.x 版本开始,您应该开始使用 kafka.admin.ConsumerGroupCommand 工具。以下是该工具采用的参数

    List all consumer groups, describe a consumer group, or delete consumer group info.
    Option                                  Description
    ------                                  -----------
    --bootstrap-server <server to connect   REQUIRED (only when using new-
      to>                                     consumer): The server to connect to.
    --command-config <command config        Property file containing configs to be
      property file>                          passed to Admin Client and Consumer.
    --delete                                Pass in groups to delete topic
                                              partition offsets and ownership
                                              information over the entire consumer
                                              group. For instance --group g1 --
                                              group g2
                                            Pass in groups with a single topic to
                                              just delete the given topic's
                                              partition offsets and ownership
                                              information for the given consumer
                                              groups. For instance --group g1 --
                                              group g2 --topic t1
                                            Pass in just a topic to delete the
                                              given topic's partition offsets and
                                              ownership information for every
                                              consumer group. For instance --topic
                                              t1
                                            WARNING: Group deletion only works for
                                              old ZK-based consumer groups, and
                                              one has to use it carefully to only
                                              delete groups that are not active.
    --describe                              Describe consumer group and list
                                              offset lag related to given group.
    --group <consumer group>                The consumer group we wish to act on.
    --list                                  List all consumer groups.
    --new-consumer                          Use new consumer.
    --topic <topic>                         The topic whose consumer group
                                              information should be deleted.
    --zookeeper <urls>                      REQUIRED (unless new-consumer is
                                              used): The connection string for the
                                              zookeeper connection in the form
                                              host:port. Multiple URLS can be
                                              given to allow fail-over.
    

    要获取消费者组_Y 的 Topic_X 的偏移量,请使用以下命令

    bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --zookeeper <zookeeper urls> --describe --group consumerGroup_Y
    

    响应看起来像

    GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER
    consumerGroup, Topic_X, 0, 3030460, 3168412, 137952, none
    consumerGroup, Topic_X, 1, 3030903, 3168884, 137981, none
    consumerGroup, Topic_X, 2, 801564, 939540, 137976, none
    consumerGroup, Topic_X, 3, 737290, 875262, 137972, none
    consumerGroup, Topic_X, 4, 737288, 875254, 137966, none
    consumerGroup, Topic_X, 5, 737276, 875241, 137965, none
    consumerGroup, Topic_X, 6, 737290, 875251, 137961, none
    consumerGroup, Topic_X, 7, 737290, 875248, 137958, none
    consumerGroup, Topic_X, 8, 737288, 875246, 137958, none
    consumerGroup, Topic_X, 9, 737293, 875251, 137958, none
    consumerGroup, Topic_X, 10, 737289, 875244, 137955, none
    consumerGroup, Topic_X, 11, 737273, 875226, 137953, none
    

    【讨论】:

    • 有没有人整理出如何使用 bin/kafka-consumer-groups.sh--command-config 和较新的 kafka 0.10.x 修改偏移量?
    【解决方案5】:

    假设我们有明天的主题名称是27
    我们的要求是
    Req 1:想知道topic的partition和offset详情。
    Ans : 我们可以使用 GetOffsetShell 命令,如下图所示。

    需求 2:想知道消费者组消耗的偏移量。
    Ans:我们可以使用 ConsumerGroupCommand,如下图所示。

    【讨论】:

      猜你喜欢
      • 2021-02-22
      • 2018-01-24
      • 2019-07-20
      • 2018-07-13
      • 1970-01-01
      • 2018-01-23
      • 1970-01-01
      • 2015-11-21
      相关资源
      最近更新 更多