【问题标题】:Differentiating between binary encoded Avro and JSON messages区分二进制编码的 Avro 和 JSON 消息
【发布时间】:2018-08-06 14:56:02
【问题描述】:
我正在使用 python 来读取来自各种主题的消息。一些主题的消息以纯 JSON 编码,而另一些主题则使用 Avro 二进制序列化和融合模式注册表。
当我收到一条消息时,我需要知道它是否需要被解码。目前我只依赖于二进制编码消息以MAGIC_BYTE 开头的事实,该值为零:
from confluent_kafka.cimpl import Consumer
consumer = Consumer(config)
consumer.subsrcibe(...)
msg = consumer.poll()
# check the msg is not null or error etc
if msg.values()[0] == 0:
# It is binary encoded
else:
# It is json
我想知道有没有更好的方法来做到这一点?
【问题讨论】:
标签:
python
apache-kafka
avro
confluent-schema-registry
【解决方案1】:
你可以得到你消息的字节0-5,然后
magic_byte = message_bytes[0]
schema_id = message_bytes[1:5]
然后,在您的注册表中查找 GET /schemas/{schema_id},并在收到 200 响应代码时缓存 ID + 架构(如果需要)。
否则,消息是 JSON,或者生产者已将其数据发送到不同的注册表(如果您的环境中有多个注册表)。 注意:这意味着数据仍然可以是 Avro
【解决方案2】:
您可以先通过 REST 简单地查询模式注册表,然后为那里注册的主题构建本地缓存。然后,当您尝试解码来自特定主题的消息时,只需将该主题与该列表的内容进行比较。如果它在那里,你就知道它已经被解码了。
当然,这只有在所有 Avro 编码的主题都使用 Schema Registry 时才有效。如果您收到的 Avro 编码消息未在 Schema Registry 中注册,那么它将不起作用。