【问题标题】:Spark Structured Streaming Databricks Event Hub Schema Defining issueSpark Structured Streaming Databricks 事件中心架构定义问题
【发布时间】:2019-06-26 20:50:38
【问题描述】:

我在定义 json 文档的结构时遇到问题。

现在我正在尝试在 streamread 上执行相同的架构。

val jsonSchema = StructType([ StructField("associatedEntities", struct<driver:StringType,truck:StringType>, True), 
                          StructField("heading", StringType, True), 
                          StructField("location", struct<accuracyType:StringType,captureDateTime:StringType,cityStateCode:StringType,description:StringType,latitude:DoubleType,longitude:DoubleType,quality:StringType,transmitDateTime:StringType>, True), 
                          StructField("measurements", array<struct<type:StringType,uom:StringType,value:StringType>>, True), 
                          StructField("source", struct<entityType:StringType,key:StringType,vendor:StringType>, True), 
                          StructField("speed", DoubleType, True)])

val df = spark
 .readStream
 .format("eventhubs")
 //.schema(jsonSchema) 
 .options(ehConf.toMap)
 .load()

当我在笔记本中运行此单元格时“:15:错误:简单表达式的非法开始 val jsonSchema = StructType([ StructField("associatedEntities", struct, True),"

编辑:目标是将数据放入数据框中。我可以从事件中心消息的正文中获取 json 字符串,但如果我无法让架构工作,我不确定从那里做什么。

【问题讨论】:

  • 我将如何处理 array> 在那个 .add 样式中?
  • 事件中心似乎不需要架构,我正在尝试采用具有 json 对象的二进制主体 col,然后对其进行结构化

标签: scala apache-spark databricks spark-structured-streaming


【解决方案1】:

由于您的架构定义,您会收到错误消息。架构定义应如下所示:

import org.apache.spark.sql.types._

val jsonSchema = StructType(
                        Seq(StructField("associatedEntities", 
                                        StructType(Seq(
                                          StructField("driver", StringType), 
                                          StructField ("truck", StringType)
                                        ))),
                            StructField("heading", StringType),
                            StructField("measurements", ArrayType(StructType(Seq(StructField ("type", StringType), StructField ("uom", StringType), StructField("value", StringType)))))
                           )
                         )

您可以使用以下命令仔细检查架构:

jsonSchema.printTreeString

返回架构:

root
 |-- associatedEntities: struct (nullable = true)
 |    |-- driver: string (nullable = true)
 |    |-- truck: string (nullable = true)
 |-- heading: string (nullable = true)
 |-- measurements: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- type: string (nullable = true)
 |    |    |-- uom: string (nullable = true)
 |    |    |-- value: string (nullable = true)

如 cmets 中所述,您将获得二进制数据。所以首先你得到原始数据框:

val rawData = spark.readStream
  .format("eventhubs")
  .option(...)
  .load()

你必须:

  • 将数据转换为字符串
  • 解析嵌套的json
  • 并将其展平

用解析后的数据定义数据框:

val parsedData = rawData
   .selectExpr("cast (Body as string) as json")
   .select(from_json($"json", jsonSchema).as("data"))
   .select("data.*")

【讨论】:

  • 我正在研究你的答案,我很好奇 seq 函数是做什么的?
猜你喜欢
  • 2020-03-07
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-03-17
  • 2020-03-19
  • 2021-04-13
  • 2020-09-12
相关资源
最近更新 更多