【问题标题】:How do I decode an Avro message in Python?如何在 Python 中解码 Avro 消息?
【发布时间】:2020-10-13 18:34:10
【问题描述】:

我在 Python (3.6.11) 中解码 Avro 消息时遇到问题。我已经尝试过avrofastavro 包。所以我认为问题可能是我提供的字节不正确。

使用avro

from avro.io import DatumReader, BinaryDecoder
import avro.schema
from io import BytesIO

schema = avro.schema.parse("""
    {
        "type": "record",
        "name": "User",
        "namespace": "example.avro",
        "fields": [
            {
                "name": "name",
                "type": "string"
            },
            {
                "name": "favorite_number",
                "type": [
                    "int",
                    "null"
                ]
            },
            {
                "name": "favorite_color",
                "type": [
                    "string",
                    "null"
                ]
            }
        ]
    }
""")

rb = BytesIO(b'{"name": "Alyssa", "favorite_number": 256}')
decoder = BinaryDecoder(rb)
reader = DatumReader(schema)
msg = reader.read(decoder)
print(msg)

Traceback (most recent call last):
  File "main.py", line 36, in <module>
    msg = reader.read(decoder)
  File "/opt/virtualenvs/python3/lib/python3.8/site-packages/avro/io.py", line 626, in read
    return self.read_data(self.writers_schema, self.readers_schema, decoder)
  File "/opt/virtualenvs/python3/lib/python3.8/site-packages/avro/io.py", line 698, in read_data
    return self.read_record(writers_schema, readers_schema, decoder)
  File "/opt/virtualenvs/python3/lib/python3.8/site-packages/avro/io.py", line 898, in read_record
    field_val = self.read_data(field.type, readers_field.type, decoder)
  File "/opt/virtualenvs/python3/lib/python3.8/site-packages/avro/io.py", line 638, in read_data
    return self.read_union(writers_schema, readers_schema, decoder)
  File "/opt/virtualenvs/python3/lib/python3.8/site-packages/avro/io.py", line 854, in read_union
    index_of_schema = int(decoder.read_long())
  File "/opt/virtualenvs/python3/lib/python3.8/site-packages/avro/io.py", line 240, in read_long
    b = ord(self.read(1))
TypeError: ord() expected a character, but string of length 0 found

使用fastavro:

from fastavro import schemaless_reader, parse_schema
from io import BytesIO

schema = parse_schema(
    {
        "type": "record",
        "name": "User",
        "namespace": "example.avro",
        "fields": [
            {
                "name": "name",
                "type": "string"
            },
            {
                "name": "favorite_number",
                "type": [
                    "int",
                    "null"
                ]
            },
            {
                "name": "favorite_color",
                "type": [
                    "string",
                    "null"
                ]
            }
        ]
    }
)

rb = BytesIO(b'{"name": "Alyssa", "favorite_number": 256}')
msg = schemaless_reader(rb, schema)
print(msg)

Traceback (most recent call last):
  File "main.py", line 33, in <module>
    msg = schemaless_reader(rb, schema)
  File "fastavro/_read.pyx", line 969, in fastavro._read.schemaless_reader
  File "fastavro/_read.pyx", line 981, in fastavro._read.schemaless_reader
  File "fastavro/_read.pyx", line 652, in fastavro._read._read_data
  File "fastavro/_read.pyx", line 510, in fastavro._read.read_record
  File "fastavro/_read.pyx", line 644, in fastavro._read._read_data
  File "fastavro/_read.pyx", line 429, in fastavro._read.read_union
  File "fastavro/_read.pyx", line 200, in fastavro._read.read_long
StopIteration

我不知道我正在编码的消息是否格式错误,或者问题是否与编码本身有关。有什么建议吗?

【问题讨论】:

  • 你不应该阅读 avro 文件吗?您当前正在尝试读取非 avro 格式的数据。您可以将内容写入avro文件,然后再次读取
  • 我正在使用 kafka-python 从主题中消费。但我一直遇到解码消息的问题。我试图隔离解码问题,但你可能是对的,我不能这样做。当我使用任一库写入缓冲区时,我可以对其进行解码。问题实际上可能出在我的 kafka-python 消费者身上,甚至可能出在我的主题上产生的消息中。

标签: python apache-kafka avro fastavro


【解决方案1】:

我会和 fastavro 谈谈,因为这是我最了解的。

您的 rb 变量应该是您尝试读取的 avro 二进制文件(而不是数据)。要获取此二进制文件的示例,您可以编写:

rb = BytesIO()
schemaless_writer(rb, schema, {"name": "Alyssa", "favorite_number": 256})
rb.getvalue()  # b'\x0cAlyssa\x00\x80\x04\x02'

然后你可以做你想做的事并读取生成的二进制文件:

rb = BytesIO(b'\x0cAlyssa\x00\x80\x04\x02')
data = schemaless_reader(rb, schema)
# {'name': 'Alyssa', 'favorite_number': 256, 'favorite_color': None}

【讨论】:

  • 不确定我的头在哪里。我正在使用 AKHQ 并在 Kafka 主题上生成消息。我错误地认为 AKHQ 使用注册表中的模式将消息编码到 Avro。一旦我阅读了关于我的问题的评论,我意识到这并没有发生。我添加了一个生产者,它使用 schemaless_writer 执行您所描述的操作并解决了问题。
猜你喜欢
  • 2016-12-07
  • 2020-02-16
  • 1970-01-01
  • 2016-08-28
  • 2016-11-10
  • 1970-01-01
  • 1970-01-01
  • 2020-07-27
  • 2012-01-08
相关资源
最近更新 更多