【问题标题】:python - deserialise avro byte logical type decimal to decimalpython - 将avro字节逻辑类型十进制反序列化为十进制
【发布时间】:2018-09-10 16:56:43
【问题描述】:

我正在尝试使用 python avro 库 (python 2) 读取 Avro 文件。当我使用以下代码时:

import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter, BinaryDecoder
reader = DataFileReader(open("filename.avro", "rb"), DatumReader())
schema = reader.meta

然后它会正确读取每一列,除了保留为字节的列,而不是预期的十进制值。

如何将此列转换为预期的十进制值?我注意到文件的元数据将该列标识为“类型”:“字节”,但“逻辑类型”:“十进制”

我在此列的元数据下方发布,以及字节值(预期的实际值都是 1,000 的倍数小于 25,000。该文件是使用 Kafka 创建的。

元数据:

 {
                            "name": "amount",
                            "type": {
                                "type": "bytes",
                                "scale": 8,
                                "precision": 20,
                                "connect.version": 1,
                                "connect.parameters": {
                                    "scale": "8",
                                    "connect.decimal.precision": "20"
                                },
                                "connect.name": "org.apache.kafka.connect.data.Decimal",
                                "logicalType": "decimal"
                            }
                        }

字节值:

'E\xd9d\xb8\x00'
'\x00\xe8\xd4\xa5\x10\x00'
'\x01\x17e\x92\xe0\x00'
'\x01\x17e\x92\xe0\x00'

预期值:

3,000.00
10,000.00
12,000.00
5,000.00

我需要在 AWS 上部署的 Lambda 函数中使用它,因此不能使用 fast_avro 或其他使用 C 而不是纯 Python 的库。

查看以下链接: https://pypi.org/project/fastavro/ https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-python-libraries.html

【问题讨论】:

    标签: python binary deserialization avro apache-kafka-connect


    【解决方案1】:

    为此,您需要使用fastavro 库。 avroavro-python3 库在发布时都不支持逻辑类型。

    【讨论】:

    • 我现在意识到我不能使用 fastavro,因为我需要部署在 AWS 上,所以只能使用纯 Python 编写的库。
    • 在这种情况下,我认为您目前唯一的选择可能是分叉 fastavro 并删除 Cython 部分。该库同时具有 python 和 cython 实现,因此如果您删除 cython 部分,您应该留下一个可以工作的纯 python 解决方案。
    • 谢谢。删除 cpython 部分效果很好。
    • @oli5679 能否分享一下您的解决方案?
    【解决方案2】:

    您可以使用它来将字节字符串解码为十进制。这会将值填充到下一个最高字节结构,以便所有可能的值都适合。

    import struct
    from decimal import Decimal
    
    def decode_decimal(value, num_places):
        value_size = len(value)
        for fmt in ('>b', '>h', '>l', '>q'):
            fmt_size = struct.calcsize(fmt)
            if fmt_size >= value_size:
                padding = b'\x00' * (fmt_size - value_size)
                int_value = struct.unpack(fmt, padding + value)[0]
                scale = Decimal('1') / (10 ** num_places)
                return Decimal(int_value) * scale
        raise ValueError('Could not unpack value')
    

    例如:

    >>> decode_decimal(b'\x00\xe8\xd4\xa5\x10\x00', 8)
    Decimal('10000.00000000')
    >>> decode_decimal(b'\x01\x17e\x92\xe0\x00', 8)
    Decimal('12000.00000000')
    >>> decode_decimal(b'\xb2\xb4\xe7\x84', 4)  # Negative value
    Decimal('-129676.7100')
    

    参考:

    https://avro.apache.org/docs/1.10.2/spec.html#Decimal https://docs.python.org/3/library/struct.html#format-characters

    【讨论】:

      【解决方案3】:

      由于某种原因,fastavro 包在同一个文件中默认使用。 我最终使用了下面的代码。仍然不确定是否有办法直接使用 avro 库来解决这个问题,或者反序列化上面问题中发布的输出。

      import fastavro
      with open("filename.avro", 'rb') as fo: 
          for record in fastavro.reader(fo): 
              print(record) 
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2023-03-14
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2013-09-12
        • 1970-01-01
        • 2017-05-23
        • 1970-01-01
        相关资源
        最近更新 更多