【问题标题】:Spark scala select certain columns in a dataframe as a mapSpark scala选择数据框中的某些列作为地图
【发布时间】:2020-11-07 21:48:54
【问题描述】:

我有一个数据框 df 和一个列名列表,可以从该数据框中选择一个 map

我尝试了以下方法来构建map

var df = Seq((66, "xyz1","a"),(67, "abc1","a"),(68, "fgr1","b"),(69, "yte1","d"),(70, "erx1","q"),(71, "ter1","q")).toDF("from_value", "to_value","label")

val cols = List("from_value","to_value")

df.select(
  map(
    lit(cols(0)),col(cols(0))
    ,lit(cols(1)),col(cols(1))
  )
  .as("mapped")
  ).show(false)

输出:

+------------------------------------+
|mapped                              |
+------------------------------------+
|[from_value -> 66, to_value -> xyz1]|
|[from_value -> 67, to_value -> abc1]|
|[from_value -> 68, to_value -> fgr1]|
|[from_value -> 69, to_value -> yte1]|
|[from_value -> 70, to_value -> erx1]|
|[from_value -> 71, to_value -> ter1]|
+------------------------------------+

但是,我确实发现这种方法很少有问题,例如

  • 列名列表可以包含 0 个或最多 3 个列名。上面的代码会抛出 IndexOutOfBound 异常。
  • 列名在地图中出现的顺序很重要,我需要地图中的键来保持顺序
  • 列值可以为空,需要合并为空字符串
  • df 中可能不存在列表中指定的列

是否有一种优雅的方式来处理上述场景而不会过于冗长?

【问题讨论】:

    标签: scala dataframe apache-spark


    【解决方案1】:

    您可以使用以下函数mappingExpr 选择数据框中的某些列作为地图:

    import org.apache.spark.sql.functions.{col, lit, map, when}
    import org.apache.spark.sql.{Column, DataFrame}
    
    def mappingExpr(columns: Seq[String], dataframe: DataFrame): Column = {
      def getValue(columnName: String): Column = when(col(columnName).isNull, lit("")).otherwise(col(columnName))
    
      map(
        columns
          .filter(dataframe.columns.contains)
          .flatMap(columnName => Seq(lit(columnName), getValue(columnName))): _*
      ).as("mapped")
    }  
    

    因此,鉴于您的示例数据:

    > val df = Seq((66, "xyz1","a"),(67, "abc1","a"),(68, "fgr1","b"),(69, "yte1","d"),(70, "erx1","q"),(71, "ter1","q")).toDF("from_value", "to_value","label")
    > val cols = List("from_value","to_value")
    > 
    > df.select(mappingExpr(cols, df)).show(false)
    
    +------------------------------------+
    |mapped                              |
    +------------------------------------+
    |[from_value -> 66, to_value -> xyz1]|
    |[from_value -> 67, to_value -> abc1]|
    |[from_value -> 68, to_value -> fgr1]|
    |[from_value -> 69, to_value -> yte1]|
    |[from_value -> 70, to_value -> erx1]|
    |[from_value -> 71, to_value -> ter1]|
    +------------------------------------+
    

    详细解释

    我的函数的主要思想是将列列表转换为元组列表,其中元组的第一个元素包含列名作为列,元组的第二个元素包含列值作为列。然后我展平这个元组列表并将结果传递给map spark SQL 函数

    现在让我们考虑不同的约束

    列名列表可以包含 0 到 3 个列名

    当我通过迭代列列表来构建插入到地图中的元素时,列名称的数量不会改变任何东西。如果我们传递一个空的列名列表,则没有错误:

    > val df = Seq((66, "xyz1","a"),(67, "abc1","a"),(68, "fgr1","b"),(69, "yte1","d"),(70, "erx1","q"),(71, "ter1","q")).toDF("from_value", "to_value","label")
    > val cols = List()
    >
    > df.select(mappingExpr(List(), df)).show(false)
    +------+
    |mapped|
    +------+
    |[]    |
    |[]    |
    |[]    |
    |[]    |
    |[]    |
    |[]    |
    +------+
    

    我需要地图中的键来保持顺序

    这是最棘手的一个。通常当您创建地图时,由于a map is implemented 的方式,不会保留顺序。但是在 Spark 中,似乎保留了顺序,因此它仅取决于列名顺序列表。因此,在您的示例中,如果我们更改列的名称顺序:

    > val df = Seq((66, "xyz1","a"),(67, "abc1","a"),(68, "fgr1","b"),(69, "yte1","d"),(70, "erx1","q"),(71, "ter1","q")).toDF("from_value", "to_value","label")
    > val cols = List("to_value","from_value")
    > 
    > df.select(mappingExpr(cols, df)).show(false)
    
    +------------------------------------+
    |mapped                              |
    +------------------------------------+
    |[to_value -> xyz1, from_value -> 66]|
    |[to_value -> abc1, from_value -> 67]|
    |[to_value -> fgr1, from_value -> 68]|
    |[to_value -> yte1, from_value -> 69]|
    |[to_value -> erx1, from_value -> 70]|
    |[to_value -> ter1, from_value -> 71]|
    +------------------------------------+
    

    列值可以为空,需要合并为空字符串

    我在内部函数getValue 中使用when Spark 的SQL 函数来执行此操作。所以当列值为null时,返回空字符串,否则返回列值:when(col(columnName).isNull, lit("")).otherwise(col(columnName))。因此,当您的数据框中有 null 值时,它会被空字符串替换:

    > val df = Seq((66, null,"a"),(67, "abc1","a"),(68, "fgr1","b"),(69, "yte1","d"),(70, "erx1","q"),(71, "ter1","q")).toDF("from_value", "to_value","label")
    > val cols = List("from_value","to_value")
    > 
    > df.select(mappingExpr(cols, df)).show(false)
    
    +------------------------------------+
    |mapped                              |
    +------------------------------------+
    |[from_value -> 66, to_value -> ]    |
    |[from_value -> 67, to_value -> abc1]|
    |[from_value -> 68, to_value -> fgr1]|
    |[from_value -> 69, to_value -> yte1]|
    |[from_value -> 70, to_value -> erx1]|
    |[from_value -> 71, to_value -> ter1]|
    +------------------------------------+
    

    列表中指定的列可能不存在于数据框中

    您可以使用方法columns 检索数据框的列列表。因此,我使用此方法过滤掉所有不在数据框中的列名称,其中包含.filter(dataframe.columns.contain) 行。因此,当列名列表包含不在数据框中的列名时,它将被忽略:

    > val df = Seq((66, "xyz1","a"),(67, "abc1","a"),(68, "fgr1","b"),(69, "yte1","d"),(70, "erx1","q"),(71, "ter1","q")).toDF("from_value", "to_value","label")
    > val cols = List("a_column_that_does_not_exist", "from_value","to_value")
    > 
    > df.select(mappingExpr(cols, df)).show(false)
    
    +------------------------------------+
    |mapped                              |
    +------------------------------------+
    |[from_value -> 66, to_value -> xyz1]|
    |[from_value -> 67, to_value -> abc1]|
    |[from_value -> 68, to_value -> fgr1]|
    |[from_value -> 69, to_value -> yte1]|
    |[from_value -> 70, to_value -> erx1]|
    |[from_value -> 71, to_value -> ter1]|
    +------------------------------------+
    

    【讨论】:

    • 这看起来是一个很好的解决方案。谢谢!
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2018-08-08
    • 2021-01-31
    • 2019-03-29
    • 1970-01-01
    • 2019-07-17
    • 1970-01-01
    • 2019-12-17
    相关资源
    最近更新 更多