【问题标题】:Spark unmarshal proto bytes into readable formatSpark将原始字节解组为可读格式
【发布时间】:2021-06-13 23:10:43
【问题描述】:

我的 Spark 应用程序从 Kafka 接收二进制数据。数据作为字节原始消息发送到 Kafka。原型消息是:

message Any {
  string type_url=1;
  bytes value=2;
}

使用 ScalaPB 库,我可以将 Any 消息反序列化为其原始格式。如何将值从字节反序列化为可读格式? SerializationUtils 不起作用。 这就是 Any 消息在反序列化后的样子。

#+-----------------------------------------|
#| type_url           | value              |
#+-----------------------------------------|
#|type.googleapis.c...|[0A 8D D8 04 0A 1...|
#+-----------------------------------------|

该值仍然是其字节格式。用 SerializationUtils 反序列化后数据不正确。

#+-----------------+
#|value            |
#+-----------------+
#|2020-09-04T10:...|
#+-----------------+

还有其他选择吗?有没有办法将字节反序列化为 String、Struct 或 String Json?

我使用带有 udf 的 ScalaPBs 示例将字节反序列化为 Any 消息。

val parseCloud = ProtoSQL.udf { bytes: Array[Byte] => CloudEvent.parseFrom(bytes) }

字节值带有 SerializationUtils 的 udf 如下所示。

val parseBytes = ProtoSQL.udf {bytes: Array[Byte] => deserialize(bytes)}

【问题讨论】:

    标签: scala apache-spark apache-kafka protocol-buffers scalapb


    【解决方案1】:

    如果知道Any中的消息类型,可以使用unpack方法进行反序列化。

    val unpackAny = ProtoSQL.udf { any: com.google.protobuf.any.Any => any.unpack[MyMessage] }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2011-09-24
      • 2019-03-15
      • 1970-01-01
      • 1970-01-01
      • 2019-03-24
      • 2015-11-23
      • 1970-01-01
      相关资源
      最近更新 更多