【问题标题】:Deserialize the AVRO in c#在 C# 中反序列化 AVRO
【发布时间】:2018-05-27 20:21:36
【问题描述】:

我有一个以 JSON 格式编写的架构。我从 kafka 服务器得到一个字符串,如下所示:

\ 0 \ 0 \ 0 \ u00032H45d71580-9781-4d9c-8535-a233ff7c3122 \ nPLANTH45d71580-9781-4d9c-8535-a233ff7c3122 \ nPLANT,2017-12-12T16:34:15GMT \ u001020171212 \ u0018201712121034 \ nthertH1AB5297A-9D28- 4742-A95C-4A4CEED7037D\n假\n假\n交叉\u00021\u00025

现在我尝试反序列化字符串并根据我的 Schema 文件将其转换为 Object。我怎么能在 C# 中做到这一点?有没有我可以使用的库?

我尝试了 Microsoft.Hadoop.Avro。 https://docs.microsoft.com/en-us/azure/hdinsight/hadoop/apache-hadoop-dotnet-avro-serialization#Scenario1 一旦代码运行到:

var actual = avroSerializer.Deserialize(buffer);

它会抛出一个异常: “数组尺寸超出支持范围”

我从 kafka 获取字符串。另一个应用程序产生它,我的应用程序消耗它。该应用程序生成它是用 swift 编写的,他们使用一些 nodejs lib 进行序列化。所以我猜字符串的格式是否重要?

kafka 消息由 Javascript 应用程序生成。他们使用名为 AVSC(Avro for Javascript)的库来序列化字符串。收到消息(字符串)后,我将其转换为字节流,之后我发现这个字节与 AVSC lib 生成的原始字节有点不同。但为什么呢?

【问题讨论】:

标签: c# apache-kafka avro


【解决方案1】:

您可以尝试使用https://github.com/AdrianStrugala/AvroConvert
如果来自 Kafka 的文件只包含数据使用:

var actual = AvroCnvert.DeserializeHeadless<TheModel>(buffer, schema);

您需要确保您的模型和架​​构是正确的。

Avro 是一种数据格式(与 JSON 完全一样)。每个序列化器实现(或语言)都应该相互兼容。

【讨论】:

    【解决方案2】:

    Confluent 的 Java 库(我怀疑是 Swift 应用程序用来写入 Kafka 的库)在序列化为 Avro 的二进制编码时会写入一个魔术字节。见这篇文章:https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html#wire-format

    他们将其用于版本控制和向后兼容性,详见此处:https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets

    但是,您使用的 Microsoft.Hadoop.Avro 库在反序列化/序列化时不使用魔术字节。在调用 Deserialize() 之前尝试从流中删除第一个字节。

    【讨论】:

    • 除此之外,如果您使用的是架构注册表,它还会在魔术字节之后附加 4 个字节,即架构 ID 的 int32。
    • 我试图删除第一个字节,以及前 5 个字节,但它仍然给我同样的错误。我检查了字节数组,似乎swift发布的字节是十六进制的,但是我们从kafka得到的字节是十进制的。
    • 十六进制和十进制是字节数组的编码;无论它们如何表示,底层字节都是相同的。 Kafka 只是一个存储数据存储,所以如果您的 Swift 应用程序正在编写一个十六进制编码的字符串,您必须从中获取一个字节数组,试试这个:How do you convert a byte array to a hexadecimal string, and vice versa
    【解决方案3】:

    你应该试试 Microsoft.Hadoop.Avro.Container.AvroContainer 这有一个 CreateGenericReader 方法。类似的东西;

    using (var reader = AvroContainer.CreateGenericReader(buffer))
            {
                while (reader.MoveNext())
                {
                    foreach (dynamic record in reader.Current.Objects) {
                          // Take a look at what you get in the record
                    }
                }
            }
    

    Nuget 包是 Microsoft.Avro.Tools(.Net Core 中的 v0.1.0)

    【讨论】:

    • 我刚试过这个。这是行不通的。它将抛出异常:“在流中的有效 avro 对象容器中。”我现在猜测我从 kafka 得到的字符串格式不正确......
    猜你喜欢
    • 2020-03-13
    • 1970-01-01
    • 2020-03-04
    • 1970-01-01
    • 2017-02-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多