在您继续之前:此操作又是另一个groupByKey。虽然它有多个合法应用程序,但相对昂贵,因此请务必仅在需要时使用它。
不是完全简洁或高效的解决方案,但您可以使用 Spark 1.5.0 中引入的UserDefinedAggregateFunction:
object GroupConcat extends UserDefinedAggregateFunction {
def inputSchema = new StructType().add("x", StringType)
def bufferSchema = new StructType().add("buff", ArrayType(StringType))
def dataType = StringType
def deterministic = true
def initialize(buffer: MutableAggregationBuffer) = {
buffer.update(0, ArrayBuffer.empty[String])
}
def update(buffer: MutableAggregationBuffer, input: Row) = {
if (!input.isNullAt(0))
buffer.update(0, buffer.getSeq[String](0) :+ input.getString(0))
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
buffer1.update(0, buffer1.getSeq[String](0) ++ buffer2.getSeq[String](0))
}
def evaluate(buffer: Row) = UTF8String.fromString(
buffer.getSeq[String](0).mkString(","))
}
示例用法:
val df = sc.parallelize(Seq(
("username1", "friend1"),
("username1", "friend2"),
("username2", "friend1"),
("username2", "friend3")
)).toDF("username", "friend")
df.groupBy($"username").agg(GroupConcat($"friend")).show
## +---------+---------------+
## | username| friends|
## +---------+---------------+
## |username1|friend1,friend2|
## |username2|friend1,friend3|
## +---------+---------------+
您还可以创建一个 Python 包装器,如 Spark: How to map Python with Scala or Java User Defined Functions? 所示
在实践中,提取 RDD、groupByKey、mkString 并重建 DataFrame 会更快。
将collect_list函数(Spark >= 1.6.0)与concat_ws结合可以得到类似的效果:
import org.apache.spark.sql.functions.{collect_list, udf, lit}
df.groupBy($"username")
.agg(concat_ws(",", collect_list($"friend")).alias("friends"))