【问题标题】:how to create and match schema in scala如何在scala中创建和匹配模式
【发布时间】:2020-08-24 05:58:58
【问题描述】:

你好,我有一个模式如下

|-- eventObject: struct (nullable = true)
|    |-- baseDivisionCode: string (nullable = true)
|    |-- countryCode: string (nullable = true)
|    |-- dcNumber: long (nullable = true)
|    |-- financialReportingGroup: string (nullable = true)
|    |-- itemList: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- availabletosellQty: long (nullable = true)
|    |    |    |-- distroAvailableQty: long (nullable = true)
|    |    |    |-- itemNumber: long (nullable = true)
|    |    |    |-- itemUPC: string (nullable = true)
|    |    |    |-- ossIndicator: string (nullable = true)
|    |    |    |-- turnAvailableQty: long (nullable = true)
|    |    |    |-- unitOfMeasurement: string (nullable = true)
|    |    |    |-- weightFormatType: string (nullable = true)
|    |    |    |-- whpkRatio: long (nullable = true)

为了映射这个,我创建了以下模式类型

|-- eventObject: struct (nullable = true)
|    |-- baseDivisionCode: string (nullable = true)
|    |-- countryCode: string (nullable = true)
|    |-- dcNumber: integer (nullable = true)
|    |-- financialReportingGroup: string (nullable = true)
|    |-- itemList: struct (nullable = true)
|    |    |-- availabletosellQty: long (nullable = true)
|    |    |-- distroAvailableQty: long (nullable = true)
|    |    |-- itemNumber: long (nullable = true)
|    |    |-- itemUPC: string (nullable = true)
|    |    |-- ossIndicator: string (nullable = true)
|    |    |-- turnAvailableQty: long (nullable = true)
|    |    |-- unitOfMeasurement: string (nullable = true)
|    |    |-- weightFormatType: string (nullable = true)
|    |    |-- whpkRatio: long (nullable = true)

写这样的东西

 val testSchema = new StructType()
  .add("eventObject", new StructType()
    .add("baseDivisionCode", StringType)
    .add("countryCode",StringType)
    .add("dcNumber", IntegerType)
    .add("financialReportingGroup",StringType)

    .add("itemList",new StructType(
      Array(
        StructField("availabletosellQty",LongType),
        StructField("distroAvailableQty",LongType),
        StructField("itemNumber", LongType),
        StructField("itemUPC", StringType),
        StructField("ossIndicator",StringType),
        StructField("turnAvailableQty",LongType),
        StructField("unitOfMeasurement",StringType),
        StructField("weightFormatType",StringType),
        StructField("whpkRatio",LongType)))))

但它与我收到的架构不匹配...我在这方面做错了什么?

当我尝试用一​​些数据填充时,我得到空值...

|-- eventObject: struct (nullable = true)
|    |-- baseDivisionCode: string (nullable = true)
|    |-- countryCode: string (nullable = true)
|    |-- dcNumber: long (nullable = true)
|    |-- financialReportingGroup: string (nullable = true)
|    |-- itemList: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- itemNumber: long (nullable = true)
|    |    |    |-- itemUPC: string (nullable = true)
|    |    |    |-- unitOfMeasurement: string (nullable = true)
|    |    |    |-- availabletosellQty: long (nullable = true)
|    |    |    |-- turnAvailableQty: long (nullable = true)
|    |    |    |-- distroAvailableQty: long (nullable = true)
|    |    |    |-- ossIndicator: string (nullable = true)
|    |    |    |-- weightFormatType: string (nullable = true)
|-- kafka_timestamp: timestamp (nullable = true)

|-- baseDivisionCode: string (nullable = true)
|-- countryCode: string (nullable = true)
|-- dcNumber: long (nullable = true)
|-- financialReportingGroup: string (nullable = true)
|-- itemList: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- itemNumber: long (nullable = true)
|    |    |-- itemUPC: string (nullable = true)
|    |    |-- unitOfMeasurement: string (nullable = true)
|    |    |-- availabletosellQty: long (nullable = true)
|    |    |-- turnAvailableQty: long (nullable = true)
|    |    |-- distroAvailableQty: long (nullable = true)
|    |    |-- ossIndicator: string (nullable = true)
|    |    |-- weightFormatType: string (nullable = true)
|-- kafka_timestamp: timestamp (nullable = true)

当我进一步尝试压平它时,它的错误原因是数组 "线程 "main" org.apache.spark.sql.AnalysisException 中的异常:只能星号扩展结构数据类型。属性:ArrayBuffer(itemList);"

试图得到它

|-- facilityCountryCode: string (nullable = true)
|-- facilityNum: string (nullable = true)
|-- WMT_CorrelationId: string (nullable = true)
|-- WMT_IdempotencyKey: string (nullable = true)
|-- WMT_Timestamp: string (nullable = true)
|-- countryCode: string (nullable = true)
|-- dcNumber: integer (nullable = true)
|-- financialReportingGroup: string (nullable = true)
|-- baseDivisionCode: string (nullable = true)
|-- itemNumber: integer (nullable = true)
|-- itemUPC: string (nullable = true)
|-- unitOfMeasurement: string (nullable = true)
|-- availabletosellQty: integer (nullable = true)
|-- turnAvailableQty: integer (nullable = true)
|-- distroAvailableQty: integer (nullable = true)
|-- ossIndicator: string (nullable = true)
|-- weightFormatType: string (nullable = true)
|-- kafka_timestamp: timestamp (nullable = true)
|-- year-month-day: integer (nullable = true)
|-- month: integer (nullable = true)
|-- day: integer (nullable = true)
|-- hour: integer (nullable = true)

这就是我所做的

val testParsed=TestExploded.select($"exploded.*",$"kafka_timestamp")

val testFlattened=testParsed.select($"eventObject.*",$"kafka_timestamp")

val test_flattened_further=testFlattened.select($"countryCode",
  $"dcNumber",$"financialReportingGroup",$"baseDivisionCode",**$"itemList.*"**,$"kafka_timestamp")

【问题讨论】:

  • 您将 itemListarray[struct] 转换为 struct。你应该怎么做?通过从数组中获取第 N 个元素?
  • 不,我需要数组[struct]...我需要列表中的所有元素
  • 所以只需要将long类型字段全部转换为integers?
  • 不,我需要创建一个与我收到的架构匹配的架构...我不确定如何添加 |-- element: struct (containsNull = true) 在我的架构中
  • 您的第一个和架构不匹配。 itemList 首先是一个结构数组,而在第二个模式中只是结构。您能否重新审视您的输入和预期架构?

标签: scala dataframe apache-spark schema


【解决方案1】:

使用ArrayType指定数组类型:

val testSchema = new StructType()
    .add("eventObject", new StructType()
    .add("baseDivisionCode", StringType)
    .add("countryCode", StringType)
    .add("dcNumber", LongType)
    .add("financialReportingGroup", StringType)
    .add("itemList", new ArrayType(
      new StructType(
        Array(
          StructField("itemNumber", LongType),
          StructField("itemUPC", StringType),
          StructField("unitOfMeasurement", StringType),
          StructField("availabletosellQty", LongType),
          StructField("turnAvailableQty", LongType),
          StructField("distroAvailableQty", LongType),
          StructField("ossIndicator", StringType),
          StructField("weightFormatType", StringType))), containsNull = true)))

要完全展平DataFrame,您可以使用结构爆炸数组并通过select("structColName.*") 语法将结构类型移动到顶级列,如下所示:

df
  .select("eventObject.*")
  .select(
    col("baseDivisionCode"),
    col("countryCode"),
    col("dcNumber"),
    col("financialReportingGroup"),
    explode(col("itemList")).as("explodedItemList"))
  .select(
    col("baseDivisionCode"),
    col("countryCode"),
    col("dcNumber"),
    col("financialReportingGroup"),
    col("explodedItemList.*")
  )
  .printSchema()

将输出:

root
 |-- baseDivisionCode: string (nullable = true)
 |-- countryCode: string (nullable = true)
 |-- dcNumber: long (nullable = true)
 |-- financialReportingGroup: string (nullable = true)
 |-- itemNumber: long (nullable = true)
 |-- itemUPC: string (nullable = true)
 |-- unitOfMeasurement: string (nullable = true)
 |-- availabletosellQty: long (nullable = true)
 |-- turnAvailableQty: long (nullable = true)
 |-- distroAvailableQty: long (nullable = true)
 |-- ossIndicator: string (nullable = true)
 |-- weightFormatType: string (nullable = true)

【讨论】:

  • 感谢@Duelist,这是将我的架构与传入架构进行映射。但我认为在我能够展平架构之前它不会被展平......
  • 扁平化架构是什么意思?
  • 用结果更新了问题..你能看看那个吗。
  • 所以你需要把eventObject struct 变成顶级列吗?如果是这样,这可以通过df.select("eventObject.*")来实现
猜你喜欢
  • 1970-01-01
  • 2021-02-15
  • 1970-01-01
  • 2016-05-17
  • 2021-12-17
  • 2011-11-04
  • 1970-01-01
  • 1970-01-01
  • 2012-10-31
相关资源
最近更新 更多