【发布时间】:2020-10-07 01:46:22
【问题描述】:
我正在使用Kafka-Python 将 CSV 数据发送到 Kafka 主题。消费者成功发送和接收数据。现在我正在尝试连续流式传输 csv 文件,添加到文件中的任何新条目都应自动发送到 Kafka 主题。任何建议都会对 CSV 文件的连续流式传输有所帮助
下面是我现有的代码,
from kafka import KafkaProducer
import logging
from json import dumps, loads
import csv
logging.basicConfig(level=logging.INFO)
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', value_serializer=lambda
K:dumps(K).encode('utf-8'))
with open('C:/Hadoop/Data/Job.csv', 'r') as file:
reader = csv.reader(file, delimiter = '\t')
for messages in reader:
producer.send('Jim_Topic', messages)
producer.flush()
【问题讨论】:
-
必须是 Python 吗?对于摄取/输出,Kafka Connect 通常是一种更好的方法。如果这有用,我可以根据它提供答案
-
@RobinMoffatt,是的,请使用 Kafka Connect 给我答案,我会使用它
标签: apache-kafka kafka-producer-api kafka-python