【问题标题】:Flatten a Seq of Maps to Map using Type polymorphism in Scala, Spark UDF在 Scala、Spark UDF 中使用类型多态将 Map 的 Seq 展平为 Map
【发布时间】:2019-08-10 06:00:05
【问题描述】:

我有以下函数将字符串映射序列展平为双倍。如何使类型字符串成为双泛型?

val flattenSeqOfMaps = udf { values: Seq[Map[String, Double]] => values.flatten.toMap }
flattenSeqOfMaps: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,MapType(StringType,DoubleType,false),Some(List(ArrayType(MapType(StringType,DoubleType,false),true))))

我需要类似的东西,

val flattenSeqOfMaps[S,D] = udf { values: Seq[Map[S, D]] => values.flatten.toMap }

谢谢。

编辑 1: 我正在使用火花 2.3。我知道 spark 2.4 中的高阶函数

编辑 2:我离得更近了一点。我需要什么来代替val flattenSeqOfMaps = udf { f _} 中的f _。请比较下面的joinMap 类型签名和flattenSeqOfMaps 类型签名

scala> val joinMap = udf { values: Seq[Map[String, Double]] => values.flatten.toMap }
joinMap: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,MapType(StringType,DoubleType,false),Some(List(ArrayType(MapType(StringType,DoubleType,false),true))))

scala> def f[S,D](values: Seq[Map[S, D]]): Map[S,D] = { values.flatten.toMap}
f: [S, D](values: Seq[Map[S,D]])Map[S,D]

scala> val flattenSeqOfMaps = udf { f _}
flattenSeqOfMaps: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,MapType(NullType,NullType,true),Some(List(ArrayType(MapType(NullType,NullType,true),true))))

编辑 3:以下代码对我有用。

scala> val flattenSeqOfMaps = udf { f[String,Double] _}
flattenSeqOfMaps: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,MapType(StringType,DoubleType,false),Some(List(ArrayType(MapType(StringType,DoubleType,false),true))))

【问题讨论】:

    标签: scala apache-spark generics


    【解决方案1】:

    虽然您可以将函数定义为

    import scala.reflect.runtime.universe.TypeTag
    
    def flattenSeqOfMaps[S : TypeTag, D: TypeTag] = udf { 
      values: Seq[Map[S, D]] => values.flatten.toMap
    }
    

    然后使用特定的实例:

    val df = Seq(Seq(Map("a" -> 1), Map("b" -> 1))).toDF("val")
    
    val flattenSeqOfMapsStringInt = flattenSeqOfMaps[String, Int]
    
    df.select($"val", flattenSeqOfMapsStringInt($"val") as "val").show
    
    +--------------------+----------------+
    |                 val|             val|
    +--------------------+----------------+
    |[[a -> 1], [b -> 1]]|[a -> 1, b -> 1]|
    +--------------------+----------------|
    

    也可以使用内置函数,而无需显式泛型:

    import org.apache.spark.sql.functions.{expr, flatten, map_from_arrays}
    
    def flattenSeqOfMaps_(col: String) = {
      val keys = flatten(expr(s"transform(`$col`, x -> map_keys(x))"))
      val values = flatten(expr(s"transform(`$col`, x -> map_values(x))"))
      map_from_arrays(keys, values)
    }
    
    df.select($"val", flattenSeqOfMaps_("val") as "val").show
    
    +--------------------+----------------+
    |                 val|             val|
    +--------------------+----------------+
    |[[a -> 1], [b -> 1]]|[a -> 1, b -> 1]|
    +--------------------+----------------+
    

    【讨论】:

    • 我的错。我错过了提到 spark 2.3 版本。编辑问题。
    【解决方案2】:

    以下代码对我有用。

    scala> def f[S,D](values: Seq[Map[S, D]]): Map[S,D] = { values.flatten.toMap}
    f: [S, D](values: Seq[Map[S,D]])Map[S,D]
    
    scala> val flattenSeqOfMaps = udf { f[String,Double] _}
    flattenSeqOfMaps: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,MapType(StringType,DoubleType,false),Some(List(ArrayType(MapType(StringType,DoubleType,false),true))))
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-07-16
      • 2019-05-21
      • 2023-02-02
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-03-28
      • 2020-12-30
      相关资源
      最近更新 更多