【发布时间】:2019-07-31 21:08:03
【问题描述】:
我正在尝试使用 spark 流和 scala 解析从 Azure blob 文件事件生成的 Azure 事件中心消息。
import org.apache.spark.eventhubs.{ConnectionStringBuilder, EventHubsConf}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
object eventhub {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("Event Hub")
//.config("spark.some.config.option", "some-value")
.master("local")
.getOrCreate()
import spark.implicits._
// Event hub configurations
// Replace values below with yours
val eventHubName = "xxx"
val eventHubNSConnStr = "Endpoint=xxxxx"
val connStr = ConnectionStringBuilder(eventHubNSConnStr).setEventHubName(eventHubName).build
val customEventhubParameters = EventHubsConf(connStr).setMaxEventsPerTrigger(5)
val incomingStream = spark.readStream.format("eventhubs")
.options(customEventhubParameters.toMap).load()
incomingStream.printSchema
val testSchema = new StructType()
//.add("offset", StringType)
//.add("Time", StringType)
//.add("Timestamp", LongType)
.add ("Body", new ArrayType( new StructType()
.add("topic", StringType)
.add("subject", StringType)
.add("eventType", StringType)
.add("eventTime", StringType)
.add("id", StringType)
.add("data", new StructType()
.add("api", StringType)
.add("clientRequestId", StringType)
.add("requestId", StringType)
.add("eTag", StringType)
.add("contentType", StringType)
.add("contentLength", LongType)
.add("blobType", StringType)
.add("url", StringType)
.add("sequencer", StringType)
.add("storageDiagnostics", new StructType()
.add("batchId", StringType)))
.add("dataVersion", StringType)
.add("metadataVersion", StringType), false))
// Event Hub message format is JSON and contains "body" field
// Body is binary, so you cast it to string to see the actual content of the message
val messages = incomingStream.select($"body".cast(StringType)).alias("body")
//.select(explode($"body")).alias("newbody")
.select(from_json($"body",testSchema)).alias("newbody")
.select("newbody.*")
/*
Output of val messages = incomingStream.select($"body".cast(StringType)).alias("body")
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|body |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[{"topic":"A1","subject":"A2","eventType":"A3","eventTime":"2019-07-26T17:00:32.4820786Z","id":"1","data":{"api":"PutBlob","clientRequestId":"A4","requestId":"A5","eTag":"A6","contentType":"A7","contentLength":496,"blobType":"BlockBlob","url":"https://test.blob.core.windows.net/test/20190726125719.csv","sequencer":"1","storageDiagnostics":{"batchId":"1"}},"dataVersion":"","metadataVersion":"1"}]|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
*/
messages.writeStream
.outputMode("append")
.format("console")
.option("truncate", false)
.start()
.awaitTermination()
}
}
这是原始传入流和“主体”的结构
root
|-- body: binary (nullable = true)
|-- partition: string (nullable = true)
|-- offset: string (nullable = true)
|-- sequenceNumber: long (nullable = true)
|-- enqueuedTime: timestamp (nullable = true)
|-- publisher: string (nullable = true)
|-- partitionKey: string (nullable = true)
|-- properties: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- systemProperties: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
root
|-- body: string (nullable = true)
查看“body”的输出,感觉像是一个数组,似乎需要分解,但“body”数据类型变成了字符串,并且抱怨使用“explode”函数。
当我传递模式时,此时它没有正确解析,因为它是字符串,我不确定结构到底应该是什么以及如何解析 JSON 结构。目前我得到 NULL 输出,因为它显然在 JSON 解析上失败了。任何输入表示赞赏。感谢您的帮助。
【问题讨论】:
标签: scala apache-spark spark-structured-streaming azure-eventhub