【问题标题】:Parsing Event Hub messages using spark streaming使用火花流解析事件中心消息
【发布时间】:2019-07-31 21:08:03
【问题描述】:

我正在尝试使用 spark 流和 scala 解析从 Azure blob 文件事件生成的 Azure 事件中心消息。

import org.apache.spark.eventhubs.{ConnectionStringBuilder, EventHubsConf}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._


object eventhub {
  def main(args: Array[String]): Unit = {
val spark = SparkSession
  .builder()
  .appName("Event Hub")
  //.config("spark.some.config.option", "some-value")
  .master("local")
  .getOrCreate()
import spark.implicits._


// Event hub configurations
// Replace values below with yours
val eventHubName = "xxx"
val eventHubNSConnStr = "Endpoint=xxxxx"
val connStr = ConnectionStringBuilder(eventHubNSConnStr).setEventHubName(eventHubName).build

val customEventhubParameters = EventHubsConf(connStr).setMaxEventsPerTrigger(5)
val incomingStream = spark.readStream.format("eventhubs")
                    .options(customEventhubParameters.toMap).load()
incomingStream.printSchema



val testSchema = new StructType()
  //.add("offset", StringType)
  //.add("Time", StringType)
  //.add("Timestamp", LongType)
  .add ("Body", new ArrayType( new StructType()
  .add("topic", StringType)
  .add("subject", StringType)
  .add("eventType", StringType)
  .add("eventTime", StringType)
  .add("id", StringType)
  .add("data", new StructType()
    .add("api", StringType)
    .add("clientRequestId", StringType)
    .add("requestId", StringType)
    .add("eTag", StringType)
    .add("contentType", StringType)
    .add("contentLength", LongType)
    .add("blobType", StringType)
    .add("url", StringType)
    .add("sequencer", StringType)
    .add("storageDiagnostics", new StructType()
      .add("batchId", StringType)))
  .add("dataVersion", StringType)
  .add("metadataVersion", StringType), false))



 // Event Hub message format is JSON and contains "body" field
 // Body is binary, so you cast it to string to see the actual content of the message

val messages = incomingStream.select($"body".cast(StringType)).alias("body")
  //.select(explode($"body")).alias("newbody")
  .select(from_json($"body",testSchema)).alias("newbody")
    .select("newbody.*")

/*
Output of val messages = incomingStream.select($"body".cast(StringType)).alias("body")

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|body                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[{"topic":"A1","subject":"A2","eventType":"A3","eventTime":"2019-07-26T17:00:32.4820786Z","id":"1","data":{"api":"PutBlob","clientRequestId":"A4","requestId":"A5","eTag":"A6","contentType":"A7","contentLength":496,"blobType":"BlockBlob","url":"https://test.blob.core.windows.net/test/20190726125719.csv","sequencer":"1","storageDiagnostics":{"batchId":"1"}},"dataVersion":"","metadataVersion":"1"}]|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

*/
    messages.writeStream
      .outputMode("append")
      .format("console")
      .option("truncate", false)
      .start()
      .awaitTermination()

  }

}

这是原始传入流和“主体”的结构

root
 |-- body: binary (nullable = true)
 |-- partition: string (nullable = true)
 |-- offset: string (nullable = true)
 |-- sequenceNumber: long (nullable = true)
 |-- enqueuedTime: timestamp (nullable = true)
 |-- publisher: string (nullable = true)
 |-- partitionKey: string (nullable = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- systemProperties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

root
 |-- body: string (nullable = true)

查看“body”的输出,感觉像是一个数组,似乎需要分解,但“body”数据类型变成了字符串,并且抱怨使用“explode”函数。

当我传递模式时,此时它没有正确解析,因为它是字符串,我不确定结构到底应该是什么以及如何解析 JSON 结构。目前我得到 NULL 输出,因为它显然在 JSON 解析上失败了。任何输入表示赞赏。感谢您的帮助。

【问题讨论】:

    标签: scala apache-spark spark-structured-streaming azure-eventhub


    【解决方案1】:

    根据上面打印的 body 输出,似乎没有名称为 Body 的元素,这就是它返回 null 的原因,请使用下面修改后的架构定义,它应该会有所帮助。

    val testSchema = new StructType()
          .add("topic", StringType)
          .add("subject", StringType)
          .add("eventType", StringType)
          .add("eventTime", StringType)
          .add("id", StringType)
          .add("data", new StructType()
            .add("api", StringType)
            .add("clientRequestId", StringType)
            .add("requestId", StringType)
            .add("eTag", StringType)
            .add("contentType", StringType)
            .add("contentLength", LongType)
            .add("blobType", StringType)
            .add("url", StringType)
            .add("sequencer", StringType)
            .add("storageDiagnostics", new StructType()
              .add("batchId", StringType)))
          .add("dataVersion", StringType)
          .add("metadataVersion", StringType)
    

    如果您的输入负载在数组中包含多个对象,则具有上述架构的from_json 将返回 null。如果您希望数组中有多个对象,那么下面的架构应该会有所帮助。

     val testSchema = new ArrayType(new StructType()
      .add("topic", StringType)
      .add("subject", StringType)
      .add("eventType", StringType)
      .add("eventTime", StringType)
      .add("id", StringType)
      .add("data", new StructType()
        .add("api", StringType)
        .add("clientRequestId", StringType)
        .add("requestId", StringType)
        .add("eTag", StringType)
        .add("contentType", StringType)
        .add("contentLength", LongType)
        .add("blobType", StringType)
        .add("url", StringType)
        .add("sequencer", StringType)
        .add("storageDiagnostics", new StructType()
          .add("batchId", StringType)))
      .add("dataVersion", StringType)
      .add("metadataVersion", StringType),false)
    

    【讨论】:

    • 谢谢。有效。我将“Body”字段与“ArrayType”数据类型字段一起使用的唯一原因是因为在输出中我看到了方括号并正在考虑定义数组,但有些是如何采用常规定义的。即使输出有 ARRAY,但它采用 json 模式而不定义任何数组的原因可能是什么?
    • 没有数组的架构可以工作,因为您的输入包含一个对象的数组,在这种情况下,from_json 函数会忽略顶级数组并将输入视为结构类型,请查看SPARK-19595 了解更多信息。
    猜你喜欢
    • 2022-01-08
    • 2021-07-01
    • 2016-08-13
    • 2014-10-27
    • 2015-01-08
    • 2018-08-09
    • 2018-07-27
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多