您可以使用以下函数mappingExpr 选择数据框中的某些列作为地图:
import org.apache.spark.sql.functions.{col, lit, map, when}
import org.apache.spark.sql.{Column, DataFrame}
def mappingExpr(columns: Seq[String], dataframe: DataFrame): Column = {
def getValue(columnName: String): Column = when(col(columnName).isNull, lit("")).otherwise(col(columnName))
map(
columns
.filter(dataframe.columns.contains)
.flatMap(columnName => Seq(lit(columnName), getValue(columnName))): _*
).as("mapped")
}
因此,鉴于您的示例数据:
> val df = Seq((66, "xyz1","a"),(67, "abc1","a"),(68, "fgr1","b"),(69, "yte1","d"),(70, "erx1","q"),(71, "ter1","q")).toDF("from_value", "to_value","label")
> val cols = List("from_value","to_value")
>
> df.select(mappingExpr(cols, df)).show(false)
+------------------------------------+
|mapped |
+------------------------------------+
|[from_value -> 66, to_value -> xyz1]|
|[from_value -> 67, to_value -> abc1]|
|[from_value -> 68, to_value -> fgr1]|
|[from_value -> 69, to_value -> yte1]|
|[from_value -> 70, to_value -> erx1]|
|[from_value -> 71, to_value -> ter1]|
+------------------------------------+
详细解释
我的函数的主要思想是将列列表转换为元组列表,其中元组的第一个元素包含列名作为列,元组的第二个元素包含列值作为列。然后我展平这个元组列表并将结果传递给map spark SQL 函数
现在让我们考虑不同的约束
列名列表可以包含 0 到 3 个列名
当我通过迭代列列表来构建插入到地图中的元素时,列名称的数量不会改变任何东西。如果我们传递一个空的列名列表,则没有错误:
> val df = Seq((66, "xyz1","a"),(67, "abc1","a"),(68, "fgr1","b"),(69, "yte1","d"),(70, "erx1","q"),(71, "ter1","q")).toDF("from_value", "to_value","label")
> val cols = List()
>
> df.select(mappingExpr(List(), df)).show(false)
+------+
|mapped|
+------+
|[] |
|[] |
|[] |
|[] |
|[] |
|[] |
+------+
我需要地图中的键来保持顺序
这是最棘手的一个。通常当您创建地图时,由于a map is implemented 的方式,不会保留顺序。但是在 Spark 中,似乎保留了顺序,因此它仅取决于列名顺序列表。因此,在您的示例中,如果我们更改列的名称顺序:
> val df = Seq((66, "xyz1","a"),(67, "abc1","a"),(68, "fgr1","b"),(69, "yte1","d"),(70, "erx1","q"),(71, "ter1","q")).toDF("from_value", "to_value","label")
> val cols = List("to_value","from_value")
>
> df.select(mappingExpr(cols, df)).show(false)
+------------------------------------+
|mapped |
+------------------------------------+
|[to_value -> xyz1, from_value -> 66]|
|[to_value -> abc1, from_value -> 67]|
|[to_value -> fgr1, from_value -> 68]|
|[to_value -> yte1, from_value -> 69]|
|[to_value -> erx1, from_value -> 70]|
|[to_value -> ter1, from_value -> 71]|
+------------------------------------+
列值可以为空,需要合并为空字符串
我在内部函数getValue 中使用when Spark 的SQL 函数来执行此操作。所以当列值为null时,返回空字符串,否则返回列值:when(col(columnName).isNull, lit("")).otherwise(col(columnName))。因此,当您的数据框中有 null 值时,它会被空字符串替换:
> val df = Seq((66, null,"a"),(67, "abc1","a"),(68, "fgr1","b"),(69, "yte1","d"),(70, "erx1","q"),(71, "ter1","q")).toDF("from_value", "to_value","label")
> val cols = List("from_value","to_value")
>
> df.select(mappingExpr(cols, df)).show(false)
+------------------------------------+
|mapped |
+------------------------------------+
|[from_value -> 66, to_value -> ] |
|[from_value -> 67, to_value -> abc1]|
|[from_value -> 68, to_value -> fgr1]|
|[from_value -> 69, to_value -> yte1]|
|[from_value -> 70, to_value -> erx1]|
|[from_value -> 71, to_value -> ter1]|
+------------------------------------+
列表中指定的列可能不存在于数据框中
您可以使用方法columns 检索数据框的列列表。因此,我使用此方法过滤掉所有不在数据框中的列名称,其中包含.filter(dataframe.columns.contain) 行。因此,当列名列表包含不在数据框中的列名时,它将被忽略:
> val df = Seq((66, "xyz1","a"),(67, "abc1","a"),(68, "fgr1","b"),(69, "yte1","d"),(70, "erx1","q"),(71, "ter1","q")).toDF("from_value", "to_value","label")
> val cols = List("a_column_that_does_not_exist", "from_value","to_value")
>
> df.select(mappingExpr(cols, df)).show(false)
+------------------------------------+
|mapped |
+------------------------------------+
|[from_value -> 66, to_value -> xyz1]|
|[from_value -> 67, to_value -> abc1]|
|[from_value -> 68, to_value -> fgr1]|
|[from_value -> 69, to_value -> yte1]|
|[from_value -> 70, to_value -> erx1]|
|[from_value -> 71, to_value -> ter1]|
+------------------------------------+