一 搭建单节点单broker的kafka集群
注意:请打开不同的终端分别执行以下步骤
1.复制安装包到/usr/local目录下,解压缩,重命名(或者软链接),配置环境变量
[root@hadoop ~]# cd /usr/local/ [root@hadoop local]# tar xzvf kafka_2.11-2.0.0.tgz [root@hadoop local]# mv kafka_2.11-2.0.0 kafka [root@hadoop local]# ln -s kafka_2.11-2.0.0 kafka #软链接或者重命名执行一条即可 [root@hadoop local]# vi /etc/profile 添加变量 export KAFKA_HOME=/usr/local/kafka 在PATH后添加 :$KAFKA_HOME/bin [root@hadoop local]# source /etc/profile [root@hadoop kafka]# echo $KAFKA_HOME #查看环境变量 /usr/local/kafka
2.启动服务器
启动zookeeper
[root@hadoop kafka]# zookeeper-server-start.sh config/zookeeper.properties [root@hadoop kafka]# jps #打开另一个终端查看是否启动成功 3892 Jps 3566 QuorumPeerMain
启动kafka
[root@hadoop kafka]# kafka-server-start.sh config/server.properties
3.创建topic
#创建一个分区,一个副本的主题 #副本数无法修改,只能在创建主题时指定 [root@hadoop kafka]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test Created topic "test". [root@hadoop kafka]# kafka-topics.sh --list --zookeeper localhost:2181 #列出主题 test
可以通过zk的客户端观察zk的数据结构
[root@hadoop kafka]# zkCli.sh -server localhost:2181 #进入zk客户端 Connecting to localhost:2181 2018-07-31 14:27:24,876 [myid:] - INFO [main:Environment@100] - Client environment:zookeeper.version=3.4.9-1757313, built on 08/23/2016 06:50 GMT 2018-07-31 14:27:24,879 [myid:] - INFO [main:Environment@100] - Client environment:host.name=hadoop 2018-07-31 14:27:24,880 [myid:] - INFO [main:Environment@100] - Client environment:java.version=1.8.0_11 2018-07-31 14:27:24,882 [myid:] - INFO [main:Environment@100] - Client environment:java.vendor=Oracle Corporation 2018-07-31 14:27:24,882 [myid:] - INFO [main:Environment@100] - Client environment:java.home=/usr/java/jre 2018-07-31 14:27:24,883 [myid:] - INFO [main:Environment@100] - Client environment:java.class.path=/usr/local/zookeeper/bin/../build/classes:/usr/local/zookeeper/bin/../build/lib/*.jar:/usr/local/zookeeper/bin/../lib/slf4j-log4j12-1.6.1.jar:/usr/local/zookeeper/bin/../lib/slf4j-api-1.6.1.jar:/usr/local/zookeeper/bin/../lib/netty-3.10.5.Final.jar:/usr/local/zookeeper/bin/../lib/log4j-1.2.16.jar:/usr/local/zookeeper/bin/../lib/jline-0.9.94.jar:/usr/local/zookeeper/bin/../zookeeper-3.4.9.jar:/usr/local/zookeeper/bin/../src/java/lib/*.jar:/usr/local/zookeeper/bin/../conf:.:/usr/java/lib/dt.jar:/usr/java/lib/tools.jar:/usr/java/jre/lib 2018-07-31 14:27:24,883 [myid:] - INFO [main:Environment@100] - Client environment:java.library.path=/usr/local/hadoop/lib/native:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib 2018-07-31 14:27:24,883 [myid:] - INFO [main:Environment@100] - Client environment:java.io.tmpdir=/tmp 2018-07-31 14:27:24,883 [myid:] - INFO [main:Environment@100] - Client environment:java.compiler=<NA> 2018-07-31 14:27:24,883 [myid:] - INFO [main:Environment@100] - Client environment:os.name=Linux 2018-07-31 14:27:24,883 [myid:] - INFO [main:Environment@100] - Client environment:os.arch=amd64 2018-07-31 14:27:24,883 [myid:] - INFO [main:Environment@100] - Client environment:os.version=3.10.0-514.el7.x86_64 2018-07-31 14:27:24,883 [myid:] - INFO [main:Environment@100] - Client environment:user.name=root 2018-07-31 14:27:24,883 [myid:] - INFO [main:Environment@100] - Client environment:user.home=/root 2018-07-31 14:27:24,883 [myid:] - INFO [main:Environment@100] - Client environment:user.dir=/usr/local/kafka 2018-07-31 14:27:24,888 [myid:] - INFO [main:ZooKeeper@438] - Initiating client connection, connectString=localhost:2181 sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKeeperMain$MyWatcher@67424e82 Welcome to ZooKeeper! 2018-07-31 14:27:25,037 [myid:] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@1032] - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) JLine support is enabled 2018-07-31 14:27:25,131 [myid:] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@876] - Socket connection established to localhost/127.0.0.1:2181, initiating session 2018-07-31 14:27:25,153 [myid:] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@1299] - Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x10000ded7830002, negotiated timeout = 30000 WATCHER:: WatchedEvent state:SyncConnected type:None path:null [zk: localhost:2181(CONNECTED) 0] ls / [cluster, controller, controller_epoch, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config] [zk: localhost:2181(CONNECTED) 1] ls /brokers [ids, topics, seqid] [zk: localhost:2181(CONNECTED) 2] ls /brokers/topics [test] [zk: localhost:2181(CONNECTED) 3] get /brokers/topics/test {"version":1,"partitions":{"0":[0]}} cZxid = 0x22 ctime = Tue Jul 31 14:22:42 CST 2018 mZxid = 0x22 mtime = Tue Jul 31 14:22:42 CST 2018 pZxid = 0x24 cversion = 1 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 36 numChildren = 1 [zk: localhost:2181(CONNECTED) 4] ls /brokers/topics/test [partitions] [zk: localhost:2181(CONNECTED) 5] ls /brokers/topics/test/partitions [0] [zk: localhost:2181(CONNECTED) 6] ls /brokers/topics/test/partitions/0 [state] [zk: localhost:2181(CONNECTED) 7] ls /brokers/topics/test/partitions/0/state [] [zk: localhost:2181(CONNECTED) 8] get /brokers/topics/test/partitions/0/state {"controller_epoch":1,"leader":0,"version":1,"leader_epoch":0,"isr":[0]} cZxid = 0x26 ctime = Tue Jul 31 14:22:42 CST 2018 mZxid = 0x26 mtime = Tue Jul 31 14:22:42 CST 2018 pZxid = 0x26 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 72 numChildren = 0 [zk: localhost:2181(CONNECTED) 9] quit #退出 Quitting... 2018-07-31 15:01:53,761 [myid:] - INFO [main:ZooKeeper@684] - Session: 0x10000ded7830002 closed 2018-07-31 15:01:53,789 [myid:] - INFO [main-EventThread:ClientCnxn$EventThread@519] - EventThread shut down for session: 0x10000ded7830002 [root@hadoop kafka]#