【发布时间】:2020-08-24 05:58:58
【问题描述】:
你好,我有一个模式如下
|-- eventObject: struct (nullable = true)
| |-- baseDivisionCode: string (nullable = true)
| |-- countryCode: string (nullable = true)
| |-- dcNumber: long (nullable = true)
| |-- financialReportingGroup: string (nullable = true)
| |-- itemList: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- availabletosellQty: long (nullable = true)
| | | |-- distroAvailableQty: long (nullable = true)
| | | |-- itemNumber: long (nullable = true)
| | | |-- itemUPC: string (nullable = true)
| | | |-- ossIndicator: string (nullable = true)
| | | |-- turnAvailableQty: long (nullable = true)
| | | |-- unitOfMeasurement: string (nullable = true)
| | | |-- weightFormatType: string (nullable = true)
| | | |-- whpkRatio: long (nullable = true)
为了映射这个,我创建了以下模式类型
|-- eventObject: struct (nullable = true)
| |-- baseDivisionCode: string (nullable = true)
| |-- countryCode: string (nullable = true)
| |-- dcNumber: integer (nullable = true)
| |-- financialReportingGroup: string (nullable = true)
| |-- itemList: struct (nullable = true)
| | |-- availabletosellQty: long (nullable = true)
| | |-- distroAvailableQty: long (nullable = true)
| | |-- itemNumber: long (nullable = true)
| | |-- itemUPC: string (nullable = true)
| | |-- ossIndicator: string (nullable = true)
| | |-- turnAvailableQty: long (nullable = true)
| | |-- unitOfMeasurement: string (nullable = true)
| | |-- weightFormatType: string (nullable = true)
| | |-- whpkRatio: long (nullable = true)
写这样的东西
val testSchema = new StructType()
.add("eventObject", new StructType()
.add("baseDivisionCode", StringType)
.add("countryCode",StringType)
.add("dcNumber", IntegerType)
.add("financialReportingGroup",StringType)
.add("itemList",new StructType(
Array(
StructField("availabletosellQty",LongType),
StructField("distroAvailableQty",LongType),
StructField("itemNumber", LongType),
StructField("itemUPC", StringType),
StructField("ossIndicator",StringType),
StructField("turnAvailableQty",LongType),
StructField("unitOfMeasurement",StringType),
StructField("weightFormatType",StringType),
StructField("whpkRatio",LongType)))))
但它与我收到的架构不匹配...我在这方面做错了什么?
当我尝试用一些数据填充时,我得到空值...
|-- eventObject: struct (nullable = true)
| |-- baseDivisionCode: string (nullable = true)
| |-- countryCode: string (nullable = true)
| |-- dcNumber: long (nullable = true)
| |-- financialReportingGroup: string (nullable = true)
| |-- itemList: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- itemNumber: long (nullable = true)
| | | |-- itemUPC: string (nullable = true)
| | | |-- unitOfMeasurement: string (nullable = true)
| | | |-- availabletosellQty: long (nullable = true)
| | | |-- turnAvailableQty: long (nullable = true)
| | | |-- distroAvailableQty: long (nullable = true)
| | | |-- ossIndicator: string (nullable = true)
| | | |-- weightFormatType: string (nullable = true)
|-- kafka_timestamp: timestamp (nullable = true)
|-- baseDivisionCode: string (nullable = true)
|-- countryCode: string (nullable = true)
|-- dcNumber: long (nullable = true)
|-- financialReportingGroup: string (nullable = true)
|-- itemList: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- itemNumber: long (nullable = true)
| | |-- itemUPC: string (nullable = true)
| | |-- unitOfMeasurement: string (nullable = true)
| | |-- availabletosellQty: long (nullable = true)
| | |-- turnAvailableQty: long (nullable = true)
| | |-- distroAvailableQty: long (nullable = true)
| | |-- ossIndicator: string (nullable = true)
| | |-- weightFormatType: string (nullable = true)
|-- kafka_timestamp: timestamp (nullable = true)
当我进一步尝试压平它时,它的错误原因是数组
"线程 "main" org.apache.spark.sql.AnalysisException 中的异常:只能星号扩展结构数据类型。属性:ArrayBuffer(itemList);"
试图得到它
|-- facilityCountryCode: string (nullable = true)
|-- facilityNum: string (nullable = true)
|-- WMT_CorrelationId: string (nullable = true)
|-- WMT_IdempotencyKey: string (nullable = true)
|-- WMT_Timestamp: string (nullable = true)
|-- countryCode: string (nullable = true)
|-- dcNumber: integer (nullable = true)
|-- financialReportingGroup: string (nullable = true)
|-- baseDivisionCode: string (nullable = true)
|-- itemNumber: integer (nullable = true)
|-- itemUPC: string (nullable = true)
|-- unitOfMeasurement: string (nullable = true)
|-- availabletosellQty: integer (nullable = true)
|-- turnAvailableQty: integer (nullable = true)
|-- distroAvailableQty: integer (nullable = true)
|-- ossIndicator: string (nullable = true)
|-- weightFormatType: string (nullable = true)
|-- kafka_timestamp: timestamp (nullable = true)
|-- year-month-day: integer (nullable = true)
|-- month: integer (nullable = true)
|-- day: integer (nullable = true)
|-- hour: integer (nullable = true)
这就是我所做的
val testParsed=TestExploded.select($"exploded.*",$"kafka_timestamp")
val testFlattened=testParsed.select($"eventObject.*",$"kafka_timestamp")
val test_flattened_further=testFlattened.select($"countryCode",
$"dcNumber",$"financialReportingGroup",$"baseDivisionCode",**$"itemList.*"**,$"kafka_timestamp")
【问题讨论】:
-
您将
itemList从array[struct]转换为struct。你应该怎么做?通过从数组中获取第 N 个元素? -
不,我需要数组[struct]...我需要列表中的所有元素
-
所以只需要将
long类型字段全部转换为integers? -
不,我需要创建一个与我收到的架构匹配的架构...我不确定如何添加 |-- element: struct (containsNull = true) 在我的架构中
-
您的第一个和架构不匹配。 itemList 首先是一个结构数组,而在第二个模式中只是结构。您能否重新审视您的输入和预期架构?
标签: scala dataframe apache-spark schema