【问题标题】:how to send Json file to kafka from Python如何从 Python 将 Json 文件发送到 kafka
【发布时间】:2023-03-10 10:43:01
【问题描述】:

我有一个如下所示的 json 文件,我想从 python 发送到 kafka。

Json 文件

 filename = 'External_Risk_{}.json'.format(date.today().strftime("%Y%m%d"))
    d =df.to_json(filename, orient='records')

发送到 Kafka

from kafka import SimpleProducer, KafkaClient
import json 
from kafka import KafkaProducer
producer =KafkaProducer(bootstrap_servers='xxx.xxx.xxx.xxx')
jd = json.dumps(d)
producer.send_messages(b'message1',jd)

但它不起作用。将 json 文件发送到 Kafka 的正确方法是什么。

【问题讨论】:

  • 你为什么使用熊猫?如果你有一个数据框,你应该单独发送每一行,而不是一次发送全部
  • @cricket_007,kakfa 中的预期输出是 json,我在使用 pandas 进行许多数据操作后创建了 json。那么我是否应该使用您的方法将数据帧发送到 kakfa。

标签: python json python-3.x pandas apache-kafka


【解决方案1】:

试试这个

from confluent_kafka import Producer
import json

p = Producer({'bootstrap.servers': 'localhost:9092'})

p.produce('topic', json.dumps({"demo": "message"}))

【讨论】:

  • 使用 pip install confluent-kafka 安装 confluent kafka
  • 不是 kafka 上您要转储 json 消息的主题。
  • 像你的例子'message1'是主题
  • 在我的情况下,我如何调整你的代码,我将 json 文件保存到d
  • 只需将 p.produce('topic', jd) 放入 'jd' 变量中即可。
【解决方案2】:

您应该问如何将文件加载到字符串中,然后您只是将字符串发送到 Kafka

import json 
from kafka import KafkaProducer
producer =KafkaProducer(bootstrap_servers='xxx.xxx.xxx.xxx')

with open(filename) as f:
    data = json.load(f)
    producer.send_message(topic, data.encode('utf-8')

【讨论】:

    【解决方案3】:

    你可以通过简单的方式发送一个字典:

    producer = KafkaProducer(
        acks='all',
        bootstrap_servers=['localhost:9092'],
        value_serializer=lambda v: bytes(json.dumps(v, default=str).encode('utf-8'))
    )
    

    我在 lambda 中返回 json 之前调用 bytes() 方法

    【讨论】:

      猜你喜欢
      • 2015-10-27
      • 1970-01-01
      • 1970-01-01
      • 2019-05-07
      • 2016-02-16
      • 2021-12-27
      • 1970-01-01
      • 1970-01-01
      • 2019-08-13
      相关资源
      最近更新 更多