【问题标题】:Read Avro format in kafka by Python3通过 Python3 在 kafka 中读取 Avro 格式
【发布时间】:2020-10-12 14:09:01
【问题描述】:

我试试这个代码:

import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter

schema=avro.schema.parse(open('ff.avsc','rb').read())
reader = DatumReader(schema)

def decode(msg_value):
    message_bytes = io.BytesIO(msg_value)
    message_bytes.seek(5)
    decoder = BinaryDecoder(message_bytes)
    event_dict = reader.read(decoder)
    return event_dict

...
decode(message.value)

message.value 是我来自 kafka 的消息

出现错误

NameError: name 'BinaryDecoder' is not defined

【问题讨论】:

    标签: python python-3.x apache-kafka avro


    【解决方案1】:

    关于您的错误,我看不到该类的导入,因此可以解释...

    基于seek(5),那么您似乎正在使用 Schema Registry 编码数据...这意味着您需要使用正确的库。

    安装

    pip install confluent-kafka[avro]

    消费

    https://github.com/confluentinc/confluent-kafka-python#usage

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-05-15
      • 2020-03-17
      • 2021-12-28
      • 2017-05-06
      • 1970-01-01
      • 1970-01-01
      • 2018-11-04
      • 2019-06-09
      相关资源
      最近更新 更多