【问题标题】:Passing a map with struct-type key into a Spark UDF将具有结构类型键的映射传递到 Spark UDF
【发布时间】:2017-01-23 21:08:19
【问题描述】:

我想编写一个 Spark 1.6 UDF,它采用以下映射:

case class MyRow(mapping: Map[(Int, Int), Double])

val data = Seq(
  MyRow(Map((1, 1) -> 1.0))
)
val df = sc.parallelize(data).toDF()

df.printSchema()

root
 |-- mapping: map (nullable = true)
 |    |-- key: struct
 |    |-- value: double (valueContainsNull = false)
 |    |    |-- _1: integer (nullable = false)
 |    |    |-- _2: integer (nullable = false)

(附带说明:我发现上面的输出很奇怪,因为键的类型打印在值的类型下方,这是为什么呢?)

现在我将我的 UDF 定义为:

val myUDF = udf((inputMapping: Map[(Int,Int), Double]) =>
  inputMapping.map { case ((i1, i2), value) => ((i1 + i2), value) }
)

df
  .withColumn("udfResult", myUDF($"mapping"))
  .show()

但这给了我:

java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to scala.Tuple2

所以我尝试用自定义case class 替换(Int,Int),因为如果我想将struct 传递给UDF,我通常会这样做:

case class MyTuple2(i1: Int, i2: Int)
val myUDF = udf((inputMapping: Map[MyTuple2, Double]) => 
  inputMapping.map { case (MyTuple2(i1, i2), value) => ((i1 + i2), value) }
)

这很奇怪:

org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(mapping)' due to data type mismatch: argument 1 requires map<struct<i1:int,i2:int>,double> type, however, 'mapping' is of map<struct<_1:int,_2:int>,double> type.

由于类型匹配,我不理解上述异常。

我发现的唯一(丑陋)解决方案是传递一个org.apache.spark.sql.Row,然后“提取”结构的元素:

val myUDF = udf((inputMapping: Map[Row, Double]) => inputMapping
  .map { case (key, value) => ((key.getInt(0), key.getInt(1)), value) } // extract Row into Tuple2
  .map { case ((i1, i2), value) => ((i1 + i2), value) }
)

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    据我所知,在这种情况下使用Row 是无可避免的:在映射(或另一个元组/案例类/数组...)中使用的元组(或案例类)是一个 嵌套结构,因此它在传递到 UDF 时将表示为 Row

    我可以建议的唯一改进是使用Row.unapply 来简化代码:

    val myUDF = udf((inputMapping: Map[Row, Double]) => inputMapping
      .map { case (Row(i1: Int, i2: Int), value) => (i1 + i2, value) }
    )
    

    【讨论】:

    • 谢谢,不知道Rowunnaply
    猜你喜欢
    • 2019-01-18
    • 2017-09-09
    • 2020-06-16
    • 2018-02-02
    • 2020-08-10
    • 2012-02-06
    • 1970-01-01
    • 1970-01-01
    • 2017-07-06
    相关资源
    最近更新 更多