【问题标题】:how to send JSON object to kafka from python client如何从python客户端将JSON对象发送到kafka
【发布时间】:2015-10-27 16:35:07
【问题描述】:

我有一个简单的 JSON 对象,如下所示

d = { 'tag ': 'blah',
  'name' : 'sam',
  'score': 
    {'row1': 100,
      'row2': 200
     }
}

以下是我向 Kafka 发送消息的 python 代码

from kafka import SimpleProducer, KafkaClient
import json 

# To send messages synchronously
kafka = KafkaClient('10.20.30.12:9092')
producer = SimpleProducer(kafka)
jd = json.dumps(d)
producer.send_messages(b'message1',jd)

我在风暴日志中看到消息正在被接收但它被抛出 元组的转换 null { 这里的 json 结构 } 不知道需要做什么才能解决这个问题?..

【问题讨论】:

  • 请尝试 producer.send_messages(b'message1',d)
  • 那行不通。 d 将是一本字典。我需要将其转换为字符串或字节
  • “为元组抛出转换 null”是什么意思?
  • 没关系我解决了这个问题。我很快就会发布解决方案。
  • @Rahul 你能发布解决方案吗....我需要将 JSON 对象发送到 kafka。

标签: python json apache-kafka kafka-python


【解决方案1】:

Kafka 需要以字节为单位的值

b`some json message`

这是我的简单 Kafka 生产者,它将消息发送到 Kafka 服务器。

import json
from bson import json_util

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='localhost:9092')

for i in range(10):
    data = { 'tag ': 'blah',
        'name' : 'sam',
        'index' : i,
        'score': 
            {'row1': 100,
             'row2': 200
        }
    }   
    producer.send('orders', json.dumps(data, default=json_util.default).encode('utf-8'))

这里 json.dumps() 将 json 转换为字符串, encode('utf-8') 将字符串转换为字节数组。

【讨论】:

  • 不应该是producer.send('orders', json.dumps(data, default=json_util.default).encode('utf-8'))(“d”应该是“data”)
【解决方案2】:

以下是我的生产者到 kafka 的代码。我唯一不同的是使用yaml.safe_load 加载json 内容。它以字符串而不是 unicode 的形式返回内容。以下是sn -p

with open('smaller_test_prod.txt') as f:
    for line in f:
        d = yaml.safe_load(line)
        jd = json.dumps(d)
        producer.send_messages(b'zeus_metrics',jd)

在这里,每一行都是存储在文件中的 json 数据。

【讨论】:

  • 为什么要使用 YAML 来解析 JSON?
  • 希望我能对此投反对票。您可以将字符串转换为 utf-8
猜你喜欢
  • 1970-01-01
  • 2013-11-16
  • 2018-02-13
  • 2015-01-28
  • 2015-11-06
  • 2018-10-27
  • 1970-01-01
  • 1970-01-01
  • 2023-03-10
相关资源
最近更新 更多