安装kafka版本: kafka_2.11-1.1.0.tgz

kafka解压在opt目录下(opthadoop用户下的目录)

KAFKA---34

因为zookeeper.properties注释实在太多,所以我通过grep命令查找到kafka非注释的字符串,并把它追加到 zk.properties

KAFKA---34

cat zookeeper.properties | grep -v '#'  >> zk.properties  通过这个命令将追加到zk.properties中

KAFKA---34

修改文件的路径为你zk的路径

KAFKA---34

回到kafka的目录下 输入命令 启动zookeeper      bin/zookeeper-server-start.sh config/zk.properties

KAFKA---34

启动完成zookeeper后,另起一个渠道,zookeeper相同,筛选一下server.properties并把它追加到kafka1.properties

KAFKA---34

我们需要

启动kafka服务器(broker

KAFKA---34

创建主题

KAFKA---34

查看你的主题有哪些

KAFKA---34

生产者在主题里放入消息

KAFKA---34

消费者接受消息:

KAFKA---34

flume连接kafka(新建一个文件,我是a4 cp过来 在添加了sink里的那一段):

KAFKA---34

启动flume

./bin/flume-ng agent -c conf -f conf/a5.conf -n a5 -Dflume.root.logger=INFO,console

在接收目录里添加文件,kafka就可以接受到内容,如果传进文件没自动生成后缀为COMPLETED,flume就会报错而kafka也接收不到。


在pcharm里写消费者和生产者,通过xftp传到虚拟机里。

pcharm里面需要在线安装kafka:

KAFKA---34

直接点+号

KAFKA---34

KAFKA---34

KAFKA---34

这样我们pcharm里面的kafka就安装完成了。

我们在pcharm里面写上生产者和消费者的代码,通过xftp传到根目录下:

生产者:
from kafka import KafkaProducer
from kafka.errors import KafkaError
import json

producer=KafkaProducer(
    bootstrap_servers='localhost:9092')
while True:
    msg=input("输入消息或bye退出")
    if 'bye'==msg:
        print("欢迎使用")
        break
    producer.send("cctv1",msg.encode())
    producer.flush()

消费者:
from kafka import KafkaConsumer
consumer = KafkaConsumer("cctv1",
                         bootstrap_servers='localhost:9092')
for msg in consumer:
    print(msg.value.decode())

但是我们在虚拟机里面也是要安装kafka的。命令: pip install kafka

KAFKA---34

通过命令python3 文件名:就可以运行该Python文件了

KAFKA---34

我们在生产者里的数据,消费者就可以接收的了。

KAFKA---34

相关文章:

  • 2021-10-02
  • 2021-08-03
  • 2022-02-18
  • 2021-10-26
  • 2022-12-23
  • 2022-12-23
  • 2021-10-02
  • 2021-07-30
猜你喜欢
  • 2021-07-24
  • 2021-11-13
  • 2022-12-23
  • 2021-07-16
  • 2021-11-10
  • 2021-07-21
相关资源
相似解决方案