【发布时间】:2021-03-31 00:07:16
【问题描述】:
我正在接收作为 mqtt 消息的序列化 (AVRO) 数据。 消息看起来像这样 Objavro.codecnullavro.schemaº{"type": "record", "name": "User", "namespace": "example.avro", "fields": [{"type": "string", "name": "name"}, {"type": ["int", "null"], "name": "favorite_number"}, {"type": ["string", "null"], "name": "favorite_color"}]} Œpq+±)žJ@xX·,Alyssa €Ben redŒpq+±)žJ@xX·
我必须使用 Python3 和已知架构 user.avsc 反序列化这些数据 -
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
反序列化的数据应该是这样的
{u'favorite_color': None, u'favorite_number': 256, u'name': u'Alyssa'}
{u'favorite_color': u'red', u'favorite_number': 7, u'name': u'Ben'}
在https://avro.apache.org/docs/current/gettingstartedpython.html 给出的示例中,数据是从 DataFileWriter/Reader 方法写入/读取的,但是当消息到达时,如果能够像消息到达那样即时执行此操作会很棒,python 代码会反序列化数据并打印它。
已经处理了 MQTT 订阅逻辑,现在只打印传入的消息,我想用传入的消息打印反序列化的数据。
我尝试了以下(反序列化逻辑):
import avro.schema
from avro.io import DatumReader, DatumWriter
import io
schema = avro.schema.parse(open("user.avsc", "rb").read())
# message passed here is incoming message
bytes_reader = io.BytesIO(bytes(message, encoding='utf-8'))
decoder = avro.io.BinaryDecoder(bytes_reader)
reader = avro.io.DatumReader(schema)
data = reader.read(decoder)
print(data)
上面的代码失败(TypeError: ord() 期望一个字符,但找到长度为 0 的字符串)因为我无法找出正确的格式来用作 reader 的参数.read() 方法。我使用 io.BytesIO 的原因是因为数据以字符串形式到达,我无法传递字符串,并且显然来自 apache 页面的示例以二进制格式读取数据并使用相同的格式进行反序列化。
谢谢
【问题讨论】: