【问题标题】:spark scala convert a nested dataframe to nested datasetspark scala 将嵌套数据帧转换为嵌套数据集
【发布时间】:2020-08-24 20:23:20
【问题描述】:

我有一个嵌套数据框“inputFlowRecordsAgg”,它具有以下架构

root
 |-- FlowI.key: string (nullable = true)
 |-- FlowS.minFlowTime: long (nullable = true)
 |-- FlowS.maxFlowTime: long (nullable = true)
 |-- FlowS.flowStartedCount: long (nullable = true)
 |-- FlowI.DestPort: integer (nullable = true)
 |-- FlowI.SrcIP: struct (nullable = true)
 |    |-- bytes: binary (nullable = true)
 |-- FlowI.DestIP: struct (nullable = true)
 |    |-- bytes: binary (nullable = true)
 |-- FlowI.L4Protocol: byte (nullable = true)
 |-- FlowI.Direction: byte (nullable = true)
 |-- FlowI.Status: byte (nullable = true)
 |-- FlowI.Mac: string (nullable = true)

想要转换成以下案例类的嵌套数据集

case class InputFlowV1(val FlowI: FlowI,
                             val FlowS: FlowS)

case class FlowI(val Mac: String,
                 val SrcIP: IPAddress,
                 val DestIP: IPAddress,
                 val DestPort: Int,
                 val L4Protocol: Byte,
                 val Direction: Byte,
                 val Status: Byte,
                 var key: String = "")

case class FlowS(var minFlowTime: Long,
                          var maxFlowTime: Long,
                          var flowStartedCount: Long)

但是当我尝试使用转换它时 inputFlowRecordsAgg.as[InputFlowV1]

cannot resolve '`FlowI`' given input columns: [FlowI.DestIP,FlowI.Direction, FlowI.key, FlowS.maxFlowTime, FlowI.SrcIP, FlowS.flowStartedCount, FlowI.L4Protocol, FlowI.Mac, FlowI.DestPort, FlowS.minFlowTime, FlowI.Status];
org.apache.spark.sql.AnalysisException: cannot resolve '`FlowI`' given input columns: [FlowI.DestIP,FlowI.Direction, FlowI.key, FlowS.maxFlowTime, FlowI.SrcIP, FlowS.flowStartedCount, FlowI.L4Protocol, FlowI.Mac, FlowI.DestPort, FlowS.minFlowTime, FlowI.Status];
    at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)

一个评论问我完整的代码,这里是

def getReducedFlowR(inputFlowRecords: Dataset[InputFlowV1],
                            @transient spark: SparkSession): Dataset[InputFlowV1]={


     val inputFlowRecordsAgg = inputFlowRecords.groupBy(column("FlowI.key") as "FlowI.key")
      .agg(min("FlowS.minFlowTime") as "FlowS.minFlowTime" , max("FlowS.maxFlowTime") as "FlowS.maxFlowTime",
        sum("FlowS.flowStartedCount") as "FlowS.flowStartedCount" 
        , first("FlowI.Mac") as "FlowI.Mac"
        , first("FlowI.SrcIP") as "FlowI.SrcIP" , first("FlowI.DestIP") as "FlowI.DestIP"
        ,first("FlowI.DestPort") as "FlowI.DestPort"
        , first("FlowI.L4Protocol") as "FlowI.L4Protocol"
        , first("FlowI.Direction") as "FlowI.Direction" , first("FlowI.Status") as "FlowI.Status")

        inputFlowRecordsAgg.printSchema()

        return inputFlowRecordsAgg.as[InputFlowV1]

        }

【问题讨论】:

  • 是否可以发布您的完整代码?或发布与您的架构相关的示例数据?
  • 我已经更新了我的答案,尝试将 select 语句添加到您的代码中,如果它不起作用,请告诉我......并删除您所有的“FlowI”。 &“流动”。
  • 有什么理由不使用map并在其中组成一个新的InputFlowV1?

标签: scala dataframe apache-spark apache-spark-dataset


【解决方案1】:

原因是您的案例类架构与实际数据架构不匹配,请检查下面的案例类架构。尝试将您的案例类架构与数据架构相匹配,它将起作用。

您的案例类架构是:

scala> df.printSchema
root
 |-- FlowI: struct (nullable = true)
 |    |-- Mac: string (nullable = true)
 |    |-- SrcIP: string (nullable = true)
 |    |-- DestIP: string (nullable = true)
 |    |-- DestPort: integer (nullable = false)
 |    |-- L4Protocol: byte (nullable = false)
 |    |-- Direction: byte (nullable = false)
 |    |-- Status: byte (nullable = false)
 |    |-- key: string (nullable = true)
 |-- FlowS: struct (nullable = true)
 |    |-- minFlowTime: long (nullable = false)
 |    |-- maxFlowTime: long (nullable = false)
 |    |-- flowStartedCount: long (nullable = false)

尝试更改您的代码,如下所示,它现在应该可以工作了。

val inputFlowRecordsAgg = inputFlowRecords.groupBy(column("FlowI.key") as "key")
      .agg(min("FlowS.minFlowTime") as "minFlowTime" , max("FlowS.maxFlowTime") as "maxFlowTime",
        sum("FlowS.flowStartedCount") as "flowStartedCount" 
        , first("FlowI.Mac") as "Mac"
        , first("FlowI.SrcIP") as "SrcIP" , first("DestIP") as "DestIP"
        ,first("FlowI.DestPort") as "DestPort"
        , first("FlowI.L4Protocol") as "L4Protocol"
        , first("FlowI.Direction") as "Direction" , first("FlowI.Status") as "Status")
        .select(struct($"key",$"Mac",$"SrcIP",$"DestIP",$"DestPort",$"L4Protocol",$"Direction",$"Status").as("FlowI"),struct($"flowStartedCount",$"minFlowTime",$"maxFlowTime").as("FlowS")) // add this line & change based on your columns .. i have added roughly..:)

【讨论】:

  • 感谢@Srinivas,它的工作。尽管我期望使用 select 和 struct 之外的其他机制。我觉得与使用从数据帧到数据集的直接覆盖相比,它会有一些额外的性能影响(这就是我使用“FlowI.*”和“FlowS.*”的原因)
猜你喜欢
  • 1970-01-01
  • 2020-04-04
  • 2020-08-12
  • 2019-04-11
  • 2021-04-13
  • 1970-01-01
  • 1970-01-01
  • 2019-09-26
  • 2020-12-01
相关资源
最近更新 更多