【问题标题】:Exception in thread "main" java.lang.UnsupportedOperationException: Schema for type Unit is not supported线程“主”java.lang.UnsupportedOperationException 中的异常:不支持 Unit 类型的架构
【发布时间】:2023-06-08 19:42:02
【问题描述】:

您好,我有一个数据框 PL_join_LFD_ranked 如下:

+-----------+-----------+----------+--------+--------+-------------+
|FACILITY_ID|LOCATION_ID|PATIENT_ID|DISTANCE|CAPACITY|rank_distance|
+-----------+-----------+----------+--------+--------+-------------+
|FAC003     |LOC0001    |P1        |54      |3       |2            |
|FAC002     |LOC0001    |P1        |45      |2       |1            |
|FAC003     |LOC0001    |P2        |54      |3       |2            |
|FAC002     |LOC0001    |P2        |45      |2       |1            |
|FAC006     |LOC0010    |P3        |12      |2       |1            |
|FAC003     |LOC0010    |P3        |54      |3       |4            |

fac_cap_map如下

Map(FAC004 -> 0, FAC003 -> 0, FAC007 -> 0, FAC002 -> 0, FAC006 -> 0, FAC005 -> 0)

我想创建一个新的列当前容量,为了计算我创建了一个 UDF。

def cur_cap_udf(m: Map[Any, Int]) = udf( (cap: Int,fac:String) =>
      m foreach {case (key,value) => if ((key == fac) && (value < cap) ) value +1 else value}

    )

调用 udf

val finaldf1 = PL_join_LFD_ranked.withColumn("current_capacity", cur_cap_udf(fac_cap_map)(PL_join_LFD_ranked("CAPACITY"),PL_join_LFD_ranked("FACILITY_ID")))

我得到的错误如下

Exception in thread "main" java.lang.UnsupportedOperationException: Schema for type Unit is not supported

原因 Foreach 返回单位类型。尝试使用 Foldleft ,但是当需要最后一次操作的结果时使用 foldleft。但这里不是这样。

我只是检查 map 中的值是否小于传递给 udf 的容量,然后将 map 值增加 1。这是当前容量的逻辑。

【问题讨论】:

  • 你的 UDF 不返回任何使用 map 而不是 foreach 的东西

标签: scala apache-spark user-defined-functions


【解决方案1】:

我认为,您的问题与 Spark 无关,而是与“如何从 cur_cap 函数返回整数 value”有关。 另外,深入研究一下函数,你想得到什么样的结果,什么样的数据框?

据我了解,使用当前代码,每个设施都将单独评估,因此(key == fac) 每行只有一次True。 也许您应该尝试查看PL_join_LFD_ranked.groupBy(col("FACILITY_ID").agg(sum("CAPACITY"))),然后以某种方式处理容量? (也许,.withColumn("capped_capacity", f.min("capacity_sum", "capacity_cap")

【讨论】:

    【解决方案2】:
    def cur_cap_udf(m: Map[Any, Int]) = udf( (cap: Int,fac:String) =>
          val value = map.getOrElse(fac, cap)
          if(value<cap){
            value+1
          }else{
            value
          }
    
    )
    

    你需要从 udf 返回值。 foreach 返回单位,因此出现错误。

    【讨论】: