【问题标题】:Stream CSV data in Kafka-Python在 Kafka-Python 中流式传输 CSV 数据
【发布时间】:2020-10-07 01:46:22
【问题描述】:

我正在使用Kafka-Python 将 CSV 数据发送到 Kafka 主题。消费者成功发送和接收数据。现在我正在尝试连续流式传输 csv 文件,添加到文件中的任何新条目都应自动发送到 Kafka 主题。任何建议都会对 CSV 文件的连续流式传输有所帮助

下面是我现有的代码,

   from kafka import KafkaProducer
   import logging
   from json import dumps, loads
   import csv
   logging.basicConfig(level=logging.INFO)


   producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', value_serializer=lambda 
   K:dumps(K).encode('utf-8'))

   with open('C:/Hadoop/Data/Job.csv', 'r') as file:
   reader = csv.reader(file, delimiter = '\t')
       for messages in reader:
       producer.send('Jim_Topic', messages)
       producer.flush()

【问题讨论】:

  • 必须是 Python 吗?对于摄取/输出,Kafka Connect 通常是一种更好的方法。如果这有用,我可以根据它提供答案
  • @RobinMoffatt,是的,请使用 Kafka Connect 给我答案,我会使用它

标签: apache-kafka kafka-producer-api kafka-python


【解决方案1】:

Kafka Connect(Apache Kafka 的一部分)是一种在 Kafka 和其他系统(包括平面文件)之间进行摄取和输出的好方法。

您可以使用Kafka Connect SpoolDir connector 将 CSV 文件流式传输到 Kafka。从Confluent Hub 安装它,然后为您的源文件提供配置:

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/source-csv-spooldir-00/config \
    -d '{
        "connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
        "topic": "orders_spooldir_00",
        "input.path": "/data/unprocessed",
        "finished.path": "/data/processed",
        "error.path": "/data/error",
        "input.file.pattern": ".*\\.csv",
        "schema.generation.enabled":"true",
        "csv.first.row.as.header":"true"
        }'

有关更多示例和详细信息,请参阅 this blog

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-07-29
    • 1970-01-01
    • 2023-03-26
    • 2019-05-19
    • 1970-01-01
    • 2019-05-20
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多