【问题标题】:deserialize json object from protobuf return empty从protobuf反序列化json对象返回空
【发布时间】:2019-08-30 13:21:07
【问题描述】:

我正在使用 spark scala 中的 scalapb 解析通过 protobuf 序列化的 bahir mqtt 有效负载,但解析的 json 仅包含第一个 json 对象,其他为空。

  • Spark 版本:2.3.0
  • Scala 版本:2.11.8
  • Protobuf 版本:2
  • sparksql-scalapb 版本:0.8.0
import spark.implicits._
val parsedData = lines.select("payload").as[Array[Byte]].map(ParseData.parseFrom(_))

原始文件

syntax = "proto2";
option java_package = "protobuf";

message ParseData {
    required int64 timestamp = 1;
    message METRICS {
        required string name = 1;
        optional int64 timestamp = 2;
        optional string dataType = 3;
        optional double value = 4;
    }
    repeated METRICS metrics = 2;
    required int32 seq = 3;
}

我得到的结果

+-------------+--------------------+---+
|timestamp    |metrics             |seq|
+-------------+--------------------+---+
|1567158851979|[[T05,,,], [T06,,,]]|54 |
+-------------+--------------------+---+

但预期的结果是

+-------------+-----------------------------------------------------------------+---+
|timestamp    |metrics                                                                                                      
+-------------+-----------------------------------------------------------------+---+
|1567158851979|[[T05,1566920552229,Float,34.56], [T06,1566920552229,Float,32.5]]|54 |
+-------------+-----------------------------------------------------------------+---+

Update-1

序列化前的payload传入消息如下所示:

{
"metrics" : [{
"name" : "T05",
"timestamp" : 1566920552229,
"dataType" : "Float",
"value" : 34.56
},
{
"name" : "T06",
"timestamp" : 1566920552229,
"dataType" : "Float",
"value" : 32.5
}]
}

MQTT 服务器正在使用 eclipse tahu 项目,该项目使用 protobuf 序列化有效负载。

更新 2

代码如下:

val lines = spark.readStream
      .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
      .option("topic", topic)
      .option("username", username)
      .option("password", password)
      .load(brokerUrl)

import spark.implicits._
val parseLines = lines.select("payload").as[Array[Byte]].map(ParseData.parseFrom(_))

lines.printSchema()

val data = parseLines.writeStream
      .outputMode("append")
      .format("console")
      .option("truncate", false)
      .start()


 data.awaitTermination()

流数据的架构和样本


root
 |-- id: integer (nullable = true)
 |-- topic: string (nullable = true)
 |-- payload: binary (nullable = true)
 |-- timestamp: timestamp (nullable = true)

+---+--------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+
|id |topic                     |payload                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |timestamp          |
+---+--------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+
|0  |spBv1.0/XYZ/DDATA/Tahu/ABC|[08 83 E8 99 99 CE 2D 12 53 0A 3F 46 69 6E 69 73 68 69 6E 67 2F 54 72 69 6F 2F 66 61 63 65 53 43 36 30 30 2F 4D 45 53 50 72 6F 63 65 73 73 54 61 67 73 2F 68 6F 6C 65 43 6F 72 72 65 63 74 69 6F 6E 4C 6F 63 61 74 69 6F 6E 31 18 D2 AE 99 99 CE 2D 20 09 38 00 4A 00 65 9A D9 19 43 12 57 0A 43 46 69 6E 69 73 68 69 6E 67 2F 54 72 69 6F 2F 66 61 63 65 53 43 36 30 30 2F 4D 45 53 50 72 6F 63 65 73 73 54 61 67 73 2F 68 6F 6C 65 43 6F 72 72 65 63 74 69 6F 6E 4C 6F 63 61 74 69 6F 6E 44 65 6C 74 61 18 EA AF 99 99 CE 2D 20 09 38 00 4A 00 65 9A D9 19 43 12 49 0A 35 4D 69 78 50 72 65 70 61 72 61 74 69 6F 6E 2F 42 6F 6E 64 4D 69 78 69 6E 67 2F 73 69 67 6E 61 6C 73 4D 45 53 2F 6D 69 78 65 72 54 69 6D 65 45 6C 61 70 73 65 64 18 BA A9 99 99 CE 2D 20 03 38 00 4A 00 50 CD E2 82 11 12 4B 0A 37 4D 69 78 50 72 65 70 61 72 61 74 69 6F 6E 2F 42 6F 6E 64 4D 69 78 69 6E 67 2F 73 69 67 6E 61 6C 73 4D 45 53 2F 6D 69 78 65 72 54 69 6D 65 52 65 6D 61 69 6E 69 6E 67 18 EB A9 99 99 CE 2D 20 03 38 00 4A 00 50 F1 D6 82 11 12 4C 0A 38 46 69 6E 69 73 68 69 6E 67 2F 54 72 69 6F 2F 31 73 74 43 6F 6E 74 72 6F 6C 2F 53 69 67 6E 61 6C 73 2F 46 49 52 53 54 5F 43 4F 4E 54 52 4F 4C 5F 54 45 4D 50 5F 45 4E 47 18 EE D2 F5 9B CE 2D 20 09 38 00 4A 00 65 9D A2 02 42 12 47 0A 33 46 69 6E 69 73 68 69 6E 67 2F 54 72 69 6F 2F 66 61 63 65 53 43 36 30 30 2F 73 69 67 6E 61 6C 73 2F 4D 6F 74 6F 72 5F 43 75 72 72 65 6E 74 5F 41 6D 70 73 18 EE D2 F5 9B CE 2D 20 09 38 00 4A 00 65 22 B7 47 41 18 17]|2019-08-30 17:30:43|
|0  |spBv1.0/XYZ/DDATA/Tahu/ABC|[08 EB EF 99 99 CE 2D 12 49 0A 35 4D 69 78 50 72 65 70 61 72 61 74 69 6F 6E 2F 42 6F 6E 64 4D 69 78 69 6E 67 2F 73 69 67 6E 61 6C 73 4D 45 53 2F 6D 69 78 65 72 54 69 6D 65 45 6C 61 70 73 65 64 18 A3 B1 99 99 CE 2D 20 03 38 00 4A 00 50 CE E2 82 11 12 4B 0A 37 4D 69 78 50 72 65 70 61 72 61 74 69 6F 6E 2F 42 6F 6E 64 4D 69 78 69 6E 67 2F 73 69 67 6E 61 6C 73 4D 45 53 2F 6D 69 78 65 72 54 69 6D 65 52 65 6D 61 69 6E 69 6E 67 18 D3 B1 99 99 CE 2D 20 03 38 00 4A 00 50 F2 D6 82 11 18 18]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |2019-08-30 17:30:44|
+---+--------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+

感谢您的意见。

【问题讨论】:

  • 你有payload的样本吗?
  • @cocolapin01 在您指的是 JSON 的问题中,但从代码中它表明您正在将字节解析为案例类 - 您能纠正这个问题吗?这可能是有效负载的问题 - 也许您缺少输入中的数据。
  • @thesamet 我对问题进行了一些更改,有效负载的架构是二进制的,我必须使用外部解析器来正确解析它。我不想将其读取为 JSON,因为我将转换有效负载。
  • 您能否发布您正在加载到lines 的实际有效负载输入示例以及您用于加载它的代码?
  • @thesamet 添加了缺失的信息

标签: scala apache-spark protocol-buffers scalapb


【解决方案1】:

让我们尝试找出问题所在。我们可以先将 Spark 排除在外。

如果我们获取您提供的有效载荷并尝试解析它:

val b: Array[Byte] = Array(0x08, 0x83, 0xE8, 0x99, 0x99, 0xCE, 0x2D, 0x12, 0x53, 0x0A, 0x3F, 0x46, 0x69, 0x6E, 0x69, 0x73, 0x68, 0x69, 0x6E, 0x67, 0x2F, 0x54, 0x72, 0x69, 0x6F, 0x2F, 0x66, 0x61, 0x63, 0x65, 0x53, 0x43, 0x36, 0x30, 0x30, 0x2F, 0x4D, 0x45, 0x53, 0x50, 0x72, 0x6F, 0x63, 0x65, 0x73, 0x73, 0x54, 0x61, 0x67, 0x73, 0x2F, 0x68, 0x6F, 0x6C, 0x65, 0x43, 0x6F, 0x72, 0x72, 0x65, 0x63, 0x74, 0x69, 0x6F, 0x6E, 0x4C, 0x6F, 0x63, 0x61, 0x74, 0x69, 0x6F, 0x6E, 0x31, 0x18, 0xD2, 0xAE, 0x99, 0x99, 0xCE, 0x2D, 0x20, 0x09, 0x38, 0x00, 0x4A, 0x00, 0x65, 0x9A, 0xD9, 0x19, 0x43, 0x12, 0x57, 0x0A, 0x43, 0x46, 0x69, 0x6E, 0x69, 0x73, 0x68, 0x69, 0x6E, 0x67, 0x2F, 0x54, 0x72, 0x69, 0x6F, 0x2F, 0x66, 0x61, 0x63, 0x65, 0x53, 0x43, 0x36, 0x30, 0x30, 0x2F, 0x4D, 0x45, 0x53, 0x50, 0x72, 0x6F, 0x63, 0x65, 0x73, 0x73, 0x54, 0x61, 0x67, 0x73, 0x2F, 0x68, 0x6F, 0x6C, 0x65, 0x43, 0x6F, 0x72, 0x72, 0x65, 0x63, 0x74, 0x69, 0x6F, 0x6E, 0x4C, 0x6F, 0x63, 0x61, 0x74, 0x69, 0x6F, 0x6E, 0x44, 0x65, 0x6C, 0x74, 0x61, 0x18, 0xEA, 0xAF, 0x99, 0x99, 0xCE, 0x2D, 0x20, 0x09, 0x38, 0x00, 0x4A, 0x00, 0x65, 0x9A, 0xD9, 0x19, 0x43, 0x12, 0x49, 0x0A, 0x35, 0x4D, 0x69, 0x78, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x61, 0x74, 0x69, 0x6F, 0x6E, 0x2F, 0x42, 0x6F, 0x6E, 0x64, 0x4D, 0x69, 0x78, 0x69, 0x6E, 0x67, 0x2F, 0x73, 0x69, 0x67, 0x6E, 0x61, 0x6C, 0x73, 0x4D, 0x45, 0x53, 0x2F, 0x6D, 0x69, 0x78, 0x65, 0x72, 0x54, 0x69, 0x6D, 0x65, 0x45, 0x6C, 0x61, 0x70, 0x73, 0x65, 0x64, 0x18, 0xBA, 0xA9, 0x99, 0x99, 0xCE, 0x2D, 0x20, 0x03, 0x38, 0x00, 0x4A, 0x00, 0x50, 0xCD, 0xE2, 0x82, 0x11, 0x12, 0x4B, 0x0A, 0x37, 0x4D, 0x69, 0x78, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x61, 0x74, 0x69, 0x6F, 0x6E, 0x2F, 0x42, 0x6F, 0x6E, 0x64, 0x4D, 0x69, 0x78, 0x69, 0x6E, 0x67, 0x2F, 0x73, 0x69, 0x67, 0x6E, 0x61, 0x6C, 0x73, 0x4D, 0x45, 0x53, 0x2F, 0x6D, 0x69, 0x78, 0x65, 0x72, 0x54, 0x69, 0x6D, 0x65, 0x52, 0x65, 0x6D, 0x61, 0x69, 0x6E, 0x69, 0x6E, 0x67, 0x18, 0xEB, 0xA9, 0x99, 0x99, 0xCE, 0x2D, 0x20, 0x03, 0x38, 0x00, 0x4A, 0x00, 0x50, 0xF1, 0xD6, 0x82, 0x11, 0x12, 0x4C, 0x0A, 0x38, 0x46, 0x69, 0x6E, 0x69, 0x73, 0x68, 0x69, 0x6E, 0x67, 0x2F, 0x54, 0x72, 0x69, 0x6F, 0x2F, 0x31, 0x73, 0x74, 0x43, 0x6F, 0x6E, 0x74, 0x72, 0x6F, 0x6C, 0x2F, 0x53, 0x69, 0x67, 0x6E, 0x61, 0x6C, 0x73, 0x2F, 0x46, 0x49, 0x52, 0x53, 0x54, 0x5F, 0x43, 0x4F, 0x4E, 0x54, 0x52, 0x4F, 0x4C, 0x5F, 0x54, 0x45, 0x4D, 0x50, 0x5F, 0x45, 0x4E, 0x47, 0x18, 0xEE, 0xD2, 0xF5, 0x9B, 0xCE, 0x2D, 0x20, 0x09, 0x38, 0x00, 0x4A, 0x00, 0x65, 0x9D, 0xA2, 0x02, 0x42, 0x12, 0x47, 0x0A, 0x33, 0x46, 0x69, 0x6E, 0x69, 0x73, 0x68, 0x69, 0x6E, 0x67, 0x2F, 0x54, 0x72, 0x69, 0x6F, 0x2F, 0x66, 0x61, 0x63, 0x65, 0x53, 0x43, 0x36, 0x30, 0x30, 0x2F, 0x73, 0x69, 0x67, 0x6E, 0x61, 0x6C, 0x73, 0x2F, 0x4D, 0x6F, 0x74, 0x6F, 0x72, 0x5F, 0x43, 0x75, 0x72, 0x72, 0x65, 0x6E, 0x74, 0x5F, 0x41, 0x6D, 0x70, 0x73, 0x18, 0xEE, 0xD2, 0xF5, 0x9B, 0xCE, 0x2D, 0x20, 0x09, 0x38, 0x00, 0x4A, 0x00, 0x65, 0x22, 0xB7, 0x47, 0x41, 0x18, 0x17).map(_.toByte)

println(ParseData.parseFrom(b).toProtoString)

那么输出是:

timestamp: 1567179043843
metrics {
  name: "Finishing/Trio/faceSC600/MESProcessTags/holeCorrectionLocation1"
}
metrics {
  name: "Finishing/Trio/faceSC600/MESProcessTags/holeCorrectionLocationDelta"
}
metrics {
  name: "MixPreparation/BondMixing/signalsMES/mixerTimeElapsed"
}
metrics {
  name: "MixPreparation/BondMixing/signalsMES/mixerTimeRemaining"
}
metrics {
  name: "Finishing/Trio/1stControl/Signals/FIRST_CONTROL_TEMP_ENG"
}
metrics {
  name: "Finishing/Trio/faceSC600/signals/Motor_Current_Amps"
}
seq: 23

这表明只设置了metrics 中的name 字段,但未设置所有其他字段。

这告诉我们您传递给parseFrom 的数据没有您期望的字段。

下一步是调查为什么这些消息的生产者在序列化之前没有在 protos 中设置这些字段。问题似乎出在生产者方面。

【讨论】:

  • 谢谢@thesamet,这是输入原型的问题。它得到了修复。
猜你喜欢
  • 2022-01-14
  • 2016-12-16
  • 1970-01-01
  • 2021-12-30
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-05-14
  • 1970-01-01
相关资源
最近更新 更多