【发布时间】:2015-02-14 10:45:21
【问题描述】:
如何在 known kafka 主题的每个分区中查找提交次数和当前偏移量。我正在使用卡夫卡 v0.8.1.1
【问题讨论】:
标签: apache-kafka
如何在 known kafka 主题的每个分区中查找提交次数和当前偏移量。我正在使用卡夫卡 v0.8.1.1
【问题讨论】:
标签: apache-kafka
从您的问题中不清楚,您对哪种偏移量感兴趣。 实际上有三种类型的偏移量:
除了命令行实用程序,#1 和#2 的偏移信息也可以通过 SimpleConsumer.earliestOrLatestOffset() 获得。
如果消息数不太大,可以给GetOffsetShell指定一个大的--offsets参数,然后统计工具返回的行数。否则,您可以在 scala/java 中编写一个简单的循环,从最早开始迭代所有可用的偏移量。
Get Offset Shell
get offsets for a topic
bin/kafka-run-class.sh kafka.tools.GetOffsetShell
required argument [broker-list], [topic]
Option Description
------ -----------
--broker-list <hostname:port,..., REQUIRED: The list of hostname and hostname:port> port of the server to connect to.
--max-wait-ms <Integer: ms> The max amount of time each fetch request waits. (default: 1000)
--offsets <Integer: count> number of offsets returned (default: 1)
--partitions <partition ids> comma separated list of partition ids. If not specified, will find offsets for all partitions (default)
--time <Long: timestamp in milliseconds / -1(latest) / -2 (earliest) timestamp; offsets will come before this timestamp, as in getOffsetsBefore >
--topic <topic> REQUIRED: The topic to get offsets from.
【讨论】:
关于主题和分区的偏移量,您可以使用kafka.tools.GetOffsetShell。例如使用这些命令(我有主题games):
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic games --time -1
我将得到games:0:47841,这意味着对于主题games 和0 分区,我有最新未使用的偏移量47841(最新可用消息)。
您可以使用-2 查看第一条可用消息。
【讨论】:
此信息还有助于创建脚本以查看主题分区上的消息数(从命令行)。虽然像 Kafka-Web-Console 这样的工具很不错,但我们中的一些人生活在一个没有 GUI 的世界中。
这是脚本...使用和修改它需要您自担风险:-)
#!/bin/bash
topic=$1
if [[ -z "${topic}" ]] ; then
echo "Usage: ${0} <topic>"
exit 1
fi
if [[ -z "${KAFKA_HOME}" ]] ; then
# $KAFKA_HOME not set, using default /kafka
KAFKA_HOME="/kafka"
fi
if [ ! -d ${KAFKA_HOME} ] ; then
echo "\$KAFKA_HOME does not point to a valid directory [$KAFKA_HOME]"
exit 1
fi
cd $KAFKA_HOME
echo
echo "Topic: ${topic}: "
#
printf "Partition Count\n"
printf "~~~~~~~~~~ ~~~~~~~~~~~~\n"
idx=0
for msg in `bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic ${topic} --broker-list localhost:9092 --time -1` ; do
name=`echo ${msg} | awk -F ":" '{print $1}'`
partition=`echo ${msg} | awk -F ":" '{print $2}'`
total=`echo ${msg} | awk -F ":" '{print $3}'`
printf "%10d %12d\n" ${partition} ${total}
idx=$((idx + 1))
done
if [ ${idx} -eq 0 ] ; then
echo "Topic name not found!"
exit 1
fi
echo
exit ${rc}
【讨论】:
从 0.9.0.x 版本开始,您应该开始使用 kafka.admin.ConsumerGroupCommand 工具。以下是该工具采用的参数
List all consumer groups, describe a consumer group, or delete consumer group info.
Option Description
------ -----------
--bootstrap-server <server to connect REQUIRED (only when using new-
to> consumer): The server to connect to.
--command-config <command config Property file containing configs to be
property file> passed to Admin Client and Consumer.
--delete Pass in groups to delete topic
partition offsets and ownership
information over the entire consumer
group. For instance --group g1 --
group g2
Pass in groups with a single topic to
just delete the given topic's
partition offsets and ownership
information for the given consumer
groups. For instance --group g1 --
group g2 --topic t1
Pass in just a topic to delete the
given topic's partition offsets and
ownership information for every
consumer group. For instance --topic
t1
WARNING: Group deletion only works for
old ZK-based consumer groups, and
one has to use it carefully to only
delete groups that are not active.
--describe Describe consumer group and list
offset lag related to given group.
--group <consumer group> The consumer group we wish to act on.
--list List all consumer groups.
--new-consumer Use new consumer.
--topic <topic> The topic whose consumer group
information should be deleted.
--zookeeper <urls> REQUIRED (unless new-consumer is
used): The connection string for the
zookeeper connection in the form
host:port. Multiple URLS can be
given to allow fail-over.
要获取消费者组_Y 的 Topic_X 的偏移量,请使用以下命令
bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --zookeeper <zookeeper urls> --describe --group consumerGroup_Y
响应看起来像
GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER
consumerGroup, Topic_X, 0, 3030460, 3168412, 137952, none
consumerGroup, Topic_X, 1, 3030903, 3168884, 137981, none
consumerGroup, Topic_X, 2, 801564, 939540, 137976, none
consumerGroup, Topic_X, 3, 737290, 875262, 137972, none
consumerGroup, Topic_X, 4, 737288, 875254, 137966, none
consumerGroup, Topic_X, 5, 737276, 875241, 137965, none
consumerGroup, Topic_X, 6, 737290, 875251, 137961, none
consumerGroup, Topic_X, 7, 737290, 875248, 137958, none
consumerGroup, Topic_X, 8, 737288, 875246, 137958, none
consumerGroup, Topic_X, 9, 737293, 875251, 137958, none
consumerGroup, Topic_X, 10, 737289, 875244, 137955, none
consumerGroup, Topic_X, 11, 737273, 875226, 137953, none
【讨论】:
bin/kafka-consumer-groups.sh 和 --command-config 和较新的 kafka 0.10.x 修改偏移量?