【发布时间】:2021-03-31 01:05:27
【问题描述】:
我需要合并两个数据框并按键组合列。这两个datafrmaes具有相同的架构,例如:
root
|-- id: String (nullable = true)
|-- cMap: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
我想按“id”分组并将“cMap”聚合在一起以进行重复数据删除。 我试过代码:
val df = df_a.unionAll(df_b).groupBy("id").agg(collect_list("cMap") as "cMap").
rdd.map(x => {
var map = Map[String,String]()
x.getAs[Seq[Map[String,String]]]("cMap").foreach( y =>
y.foreach( tuple =>
{
val key = tuple._1
val value = tuple._2
if(!map.contains(key))//deduplicate
map += (key -> value)
}))
Row(x.getAs[String]("id"),map)
})
但似乎 collect_list 不能用于映射结构:
org.apache.spark.sql.AnalysisException: No handler for Hive udf class org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCollectList because: Only primitive type arguments are accepted but map<string,string> was passed as parameter 1..;
这个问题还有其他解决方案吗?
【问题讨论】:
-
你能升级到 2.x 吗? 2.x 中的聚合函数不需要 Hive
-
您的错误是您使用的是
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCollectList,但您必须使用import org.apache.spark.sql.functions.collect_list,然后它应该可以工作
标签: scala apache-spark apache-spark-sql