【问题标题】:How to deserialize avro data from mqtt message?如何从 mqtt 消息中反序列化 avro 数据?
【发布时间】:2021-03-31 00:07:16
【问题描述】:

我正在接收作为 mqtt 消息的序列化 (AVRO) 数据。 消息看起来像这样 Objavro.codecnullavro.schemaº{"type": "record", "name": "User", "namespace": "example.avro", "fields": [{"type": "string", "name": "name"}, {"type": ["int", "null"], "name": "favorite_number"}, {"type": ["string", "null"], "name": "favorite_color"}]} Œpq+±)žJ@xX·,Alyssa €Ben redŒpq+±)žJ@xX·

我必须使用 Python3 和已知架构 user.avsc 反序列化这些数据 -

{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}

反序列化的数据应该是这样的

{u'favorite_color': None, u'favorite_number': 256, u'name': u'Alyssa'}
{u'favorite_color': u'red', u'favorite_number': 7, u'name': u'Ben'}

https://avro.apache.org/docs/current/gettingstartedpython.html 给出的示例中,数据是从 DataFileWriter/Reader 方法写入/读取的,但是当消息到达时,如果能够像消息到达那样即时执行此操作会很棒,python 代码会反序列化数据并打印它。

已经处理了 MQTT 订阅逻辑,现在只打印传入的消息,我想用传入的消息打印反序列化的数据。

我尝试了以下(反序列化逻辑):

import avro.schema
from avro.io import DatumReader, DatumWriter
import io

schema = avro.schema.parse(open("user.avsc", "rb").read())
# message passed here is incoming message
bytes_reader = io.BytesIO(bytes(message, encoding='utf-8'))
decoder = avro.io.BinaryDecoder(bytes_reader)

reader = avro.io.DatumReader(schema)
data = reader.read(decoder)
print(data)

上面的代码失败(TypeError: ord() 期望一个字符,但找到长度为 0 的字符串)因为我无法找出正确的格式来用作 reader 的参数.read() 方法。我使用 io.BytesIO 的原因是因为数据以字符串形式到达,我无法传递字符串,并且显然来自 apache 页面的示例以二进制格式读取数据并使用相同的格式进行反序列化。

谢谢

【问题讨论】:

    标签: python mqtt avro


    【解决方案1】:

    如果您从 MQTT 获得的消息是字符串格式(而不是字节),那么您可能无法反序列化它。如果您看到的是字符串格式的 avro 二进制文件,您将无法将其编码为 UTF-8 并对其进行反序列化。您需要实际的二进制文件。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-09-06
      • 1970-01-01
      • 2016-09-06
      • 2020-11-22
      • 2019-07-12
      • 2020-08-29
      • 2019-04-11
      • 2019-08-03
      相关资源
      最近更新 更多