【发布时间】:2023-06-08 19:42:02
【问题描述】:
您好,我有一个数据框 PL_join_LFD_ranked 如下:
+-----------+-----------+----------+--------+--------+-------------+
|FACILITY_ID|LOCATION_ID|PATIENT_ID|DISTANCE|CAPACITY|rank_distance|
+-----------+-----------+----------+--------+--------+-------------+
|FAC003 |LOC0001 |P1 |54 |3 |2 |
|FAC002 |LOC0001 |P1 |45 |2 |1 |
|FAC003 |LOC0001 |P2 |54 |3 |2 |
|FAC002 |LOC0001 |P2 |45 |2 |1 |
|FAC006 |LOC0010 |P3 |12 |2 |1 |
|FAC003 |LOC0010 |P3 |54 |3 |4 |
fac_cap_map如下
Map(FAC004 -> 0, FAC003 -> 0, FAC007 -> 0, FAC002 -> 0, FAC006 -> 0, FAC005 -> 0)
我想创建一个新的列当前容量,为了计算我创建了一个 UDF。
def cur_cap_udf(m: Map[Any, Int]) = udf( (cap: Int,fac:String) =>
m foreach {case (key,value) => if ((key == fac) && (value < cap) ) value +1 else value}
)
调用 udf
val finaldf1 = PL_join_LFD_ranked.withColumn("current_capacity", cur_cap_udf(fac_cap_map)(PL_join_LFD_ranked("CAPACITY"),PL_join_LFD_ranked("FACILITY_ID")))
我得到的错误如下
Exception in thread "main" java.lang.UnsupportedOperationException: Schema for type Unit is not supported
原因 Foreach 返回单位类型。尝试使用 Foldleft ,但是当需要最后一次操作的结果时使用 foldleft。但这里不是这样。
我只是检查 map 中的值是否小于传递给 udf 的容量,然后将 map 值增加 1。这是当前容量的逻辑。
【问题讨论】:
-
你的 UDF 不返回任何使用 map 而不是 foreach 的东西
标签: scala apache-spark user-defined-functions