【问题标题】:Json producer in kafka-pythonkafka-python 中的 Json 生产者
【发布时间】:2021-03-09 15:06:11
【问题描述】:

我第一次尝试一个简单的 kafka 生产者,它将逐条记录地从 json 文件中获取数据。 但我遇到了错误。

我的 Json 文件(test.json):

{
  "states": 
  [
    {
      "name": "Alabama",
      "abbreviation": "AL"
    },
    {
      "name": "Alaska",
      "abbreviation": "AK"
    }
  ]
}

我的制作人班:

import json
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
print('Producer created..............')

with open('/home/ravi/test.json') as f:
    data = json.load(f)

    for state in data['states']:
        producer.send('ECJson', json.dump(state))

但我收到错误:

Producer created..............
Traceback (most recent call last):
  File "prodECJson.py", line 10, in <module>
    producer.send('ECJson', json.dump(state))
TypeError: dump() missing 1 required positional argument: 'fp'

【问题讨论】:

    标签: hadoop apache-kafka kafka-python


    【解决方案1】:

    json.dump 将写入磁盘。你想要的是json.dumps,它将从给定的对象创建一个字符串。 KafkaProducer 的 send 函数需要字节,因此您也必须对字符串进行编码。也可以直接在 KafkaProducer 对象中指定序列化器。

    例如:

    import json
    from kafka import KafkaProducer
    producer = KafkaProducer(bootstrap_servers='localhost:9092', 
                             value_serializer=lambda m: json.dumps(m).encode("utf-8"))
    print('Producer created..............')
    
    with open('/home/ravi/test.json') as f:
        data = json.load(f)
    
        for state in data['states']:
            producer.send('ECJson', state)
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2020-04-08
      • 1970-01-01
      • 2016-06-11
      • 1970-01-01
      • 2018-02-10
      • 2019-05-09
      • 2018-04-28
      相关资源
      最近更新 更多