fengguozhong

Kafka简介

💡 Tips:本文主要介绍在Linux系统中安装和使用Lafka的操作步骤。

安装Kafka

访问Kafka官网,下载安装包版本(https://kafka.apache.org/downloads),下载 kafka_2.12-3.3.2.tgz,前面的 2.12 是 Scala的版本号,后面的3.3.2是kafka的版本号。https://downloads.apache.org/kafka/3.3.2/kafka_2.12-3.3.2.tgz

下载后,按照命令进行安装:

cd /home/guozhong
wget https://downloads.apache.org/kafka/3.3.2/kafka_2.12-3.3.2.tgz
tar -zxvf kafka_2.12-3.3.2.tgz -C /opt/modules/app
cd /opt/modules/app
mv kafka_2.12-3.3.2  kafka
chown -R hadoop ./kafka

启动Kafka

安装完成后,首先要启动Kafka。登录Linux系统(本教程使用已经创建的hadoop用户),打开一个终端,输入命令启动Zookeeper服务:

cd /opt/modules/app/kafka
./bin/zookeeper-server-start.sh config/zookeeper.properties

注意:执行完上面的命令后,终端窗口会返回一堆信息,然后停止不动了,没有回到shell命令提示符状态,这时,不要误以为是程序任务死掉了,而是Zookeeper服务器已经启动了,正处于运行状态下。所以,千万不要关闭该终端窗口,一旦关闭,Zookeeper服务就停止了。

请另打开一个终端,然后输入以下命令启动kafka服务:

cd /opt/modules/app/kafka
./bin/kafka-server-start.sh config/server.properties

同样的,执行上面的命令后,终端窗口也返回一堆信息,然后就会停止不动,没有回到shell命令提示符状态,同样不要以为死机了,而是Kafka服务器已经启动,正处于运行状态。所以不要关闭终端窗口。否则,关闭后Kafka服务就停止了。

当然,要想kafka在后台运行,可以采用在结尾增加"&"的命令:

cd /opt/modules/app/kafka
./bin/kafka-server-start.sh config/server.properties &

这样,Kafka启动后就会在后台运行,即使关闭了终端窗口,服务也一直在运行。

查看当前启动的服务

[root@hadoop01 kafka]# jps
2711 Kafka
4600 Jps
2255 QuorumPeerMain

创建Topic

再打开第三个终端,然后输入以下命令,创建一个自定义名称为“testsender”的Topic:(这2.12版本之后的创建方式,老版本的创建方式命令不同,可以查看⚠️常见问题 说明)

./bin/kafka-topics.sh --create --topic testsender --bootstrap-server localhost:9092
Created topic testsender.

可执行如下命令,查看"testsender"的Topic是否已成功创建:

./bin/kafka-topics.sh --list --bootstrap-server localhost:9092
testsender

启动生产者和消费者

启动生产者发送消息:

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testsender
>hello
>123456

启动消费者接收消息:

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testsender --from-beginning

接收到消息:
hello
123456

⚠️常见问题

解决zookeeper is not a recognized option问题

出现如下错误,这时不要慌

 

分析这个问题可能有两点:

第一点:当我们使用如下命令创建 topic 时,会报此错误。比如要创建一个名称为“testsender”的topic,当前安装的版本是2.12,这时我们要使用新版本的创建topic命令来执行,老版本是报以上错误的。

老版本的创建topic方式

./bin/kafka-topics.sh --create --zookeeper localhost:2181
--replication-factor 1 --partitions 1 
--topic testsender

我们采用新版本的创建topic方式

./bin/kafka-topics.sh --create --topic testsender --bootstrap-server localhost:9092

第二点:如果采用新版本的创建方式还是报错,这时要考虑的问题多数在配置文件上,这个问题多数是因为:kafka 的配置文件和 zookeeper 的配置文件,我们都按照默认,并没有修改,这里可能需要修改就是这两个文件:kafka-server-start.sh 和 zookeeper-server-start.sh中默认的启动内存,初始值有点大,如果服务器的配置比较低的话,会报内存不足的错误,这个问题不太容易发现,这里我们可以把启动内存调小一点。

kafka-server-start.sh文件: Xms1G 调整为 Xms128M

默认:

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi

EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'}

修改后:

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms128M"
fi

EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'}

zookeeper-server-start.sh文件:Xmx512M 调整为 Xms128M

默认:

if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
    export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties"
fi

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M"
fi

EXTRA_ARGS=${EXTRA_ARGS-'-name zookeeper -loggc'}

修改为:

if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
    export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties"
fi

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx512M -Xms128M"
fi

EXTRA_ARGS=${EXTRA_ARGS-'-name zookeeper -loggc'}

接下来重启服务,再试一下:

./bin/kafka-topics.sh --create --topic testsender --bootstrap-server localhost:9092
Created topic testsender.

分类:

技术点:

相关文章: