【问题标题】:Deserializing Flume Events in C# via Avro通过 Avro 在 C# 中反序列化 Flume 事件
【发布时间】:2013-11-13 10:43:03
【问题描述】:

我已经设置了一个 Flume 服务,它可以监控 Netcat 或以 Exec 作为源来跟踪日志,诸如此类。我使用 Memory 作为通道,Avro 作为接收器(文档中指定了 Thrift,但似乎在 Flume 1.3 或 1.4 中不起作用)

我已经设置了一个 C# 套接字服务器来接收消息,并且我得到了一个字节流。如果我使用 Encoding.UTF8.GetString(buffer) 读取它们,那么我可以看到如下内容:

"\0\0\0\0\0\0\0\0\00�����Tt������5\ne\0�����Tt������5\ne\0\0appendBatch\0\0�\0�127.0.0.1 - - [12/Nov/2013:22:42:50 +0000] \"GET /docs/appdev/index.html HTTP/1.1\" 200 7645\0�127.0.0.1 - - [12/Nov/2013:22:44:07 +0000] \"GET /docs/appdev/introduction.html HTTP/1.1\" 200 8619\0�127.0.0.1 - - [12/Nov/2013:22:44:09 +0000] \"GET /docs/appdev/installation.html HTTP/1.1\" 200 9045\0�127.0.0.1 - - [12/Nov/2013:22:44:12 +0000] \"GET /docs/appdev/deployment.html HTTP/1.1\" 200 18800\0�127.0.0.1 - - [12/Nov/2013:22:49:07 +0000] \"GET /docs/appdev/source.html HTTP/1.1\" 200 24554\0�127.0.0.1 - - [12/Nov/2013:22:50:38 +0000] \"GET /docs/appdev/processes.html HTTP/1.1\" 200 30743\0�127.0.0.1 - - [12/Nov/2013:22:51:39 +0000] \"GET /docs/appdev/sample/ HTTP/1.1\" 200 1852\0�0:0:0:0:0:0:0:1 - - [12/Nov/2013:22:51:48 +0000] \"GET /sample HTTP/1.1\" 404 963\0�0:0:0:0:0:0:0:1 - - [12/Nov/2013:22:51:48 +0000] \"GET /favicon.ico HTTP/1.1\" 200 21630\0�0:0:0:0:0:0:0:1 - - [12/Nov/2013:23:02:13 +0000] \"GET /sample HTTP/1.1\" 404 963\0"

显然我正在获取数据,但我想正确反序列化它而不是进行某种正则表达式提取。我可以看到有一个官方的 Avro C# 库,还有一个带有反序列化库的 Microsoft Hadoop 库。我创建了一个本地对象来反序列化:

[DataContract]
public class AvroEvent
{
    [DataMember]
    public byte[] Body { get; set; }
}

并尝试反序列化:

  client = serverSocket.EndAccept(result);
  var myNetworkStream = new NetworkStream(client);
  myNetworkStream.Read(buffer, 0, size);
  var avro = new AvroSerializer(typeof(AvroEvent));
  var deser = avro.Deserialize(myNetworkStream);

然后我得到这个错误:

  System.InvalidOperationException was unhandled
  HResult=-2146233079
  Message=Unexpected number of bytes.
  Source=Microsoft.Hadoop.Avro

我几乎肯定会以错误的方式处理这一切,而且我敢肯定人们会告诉我不要使用 C#,但我在 Google 上几乎没有资源,所以如果有人else 实际上已经做到了这一点,并为我指明了正确的方向,我将不胜感激

托比

【问题讨论】:

  • 你能澄清对myNetworkStream.Read 的调用在做什么吗?目前,您似乎试图在反序列化之前丢弃 size 字节。如果这是您的意图,我会为此添加评论。
  • 哦,不确定,我对此一无所知,我需要一个字节流从套接字提供给 Avro 反序列化器。我会再试一次,但是一旦我对流进行了排序,那么可能会出现一些 Avro/C# 问题。还是不...
  • 我真正的需要是将实时日志数据输出到 .net 客户端,在那里我可以最有效地管理如何处理它 - 结果答案是“logstash”,而不是 Flume .. ..

标签: c# flume avro


【解决方案1】:

Flume 使用RPC mechanism 来传达事件。如果选择 Avro,那么 Flume 依赖于 Avro RPC,即 not supported by Microsoft's Avro Library(如 新增功能 中所述),因为它仅用作序列化框架。

从技术上讲,Deserialize() 方法期望流具有以下数据(以位为单位):

11[size of byte array encoded in variable-length zig zag][actual byte] (*)

您收到的错误可能是因为收到的数据有不同的wire-format


* 起始1 是必需的,因为库的版本0.8.4951.5418 将每个类型封装在null (0) 和类型(1) 的联合中,因此第一个1 用于记录AvroEvent 和第二个1 用于字段Body。此行为可在最新版本 1.1.0.5 中配置。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-05-27
    • 2017-02-12
    • 2020-03-13
    • 2021-03-14
    • 1970-01-01
    • 2021-06-09
    • 1970-01-01
    • 2020-03-04
    相关资源
    最近更新 更多