【问题标题】:Spark SQL replacement for MySQL's GROUP_CONCAT aggregate functionSpark SQL 替换 MySQL 的 GROUP_CONCAT 聚合函数
【发布时间】:2015-10-16 21:48:46
【问题描述】:

我有一个包含两个字符串类型列(username, friend) 的表,对于每个用户名,我想将其所有朋友收集在一行上,并以字符串形式连接。例如:('username1', 'friends1, friends2, friends3')

我知道 MySQL 使用 GROUP_CONCAT 执行此操作。有没有办法用 Spark SQL 做到这一点?

【问题讨论】:

  • 如果您使用的是 Spark 2.4+,您可以结合使用 collect_list()array_join()。不需要UDF。详情请see my answer

标签: apache-spark aggregate-functions apache-spark-sql


【解决方案1】:

在您继续之前:此操作又是另一个groupByKey。虽然它有多个合法应用程序,但相对昂贵,因此请务必仅在需要时使用它。


不是完全简洁或高效的解决方案,但您可以使用 Spark 1.5.0 中引入的UserDefinedAggregateFunction

object GroupConcat extends UserDefinedAggregateFunction {
    def inputSchema = new StructType().add("x", StringType)
    def bufferSchema = new StructType().add("buff", ArrayType(StringType))
    def dataType = StringType
    def deterministic = true 

    def initialize(buffer: MutableAggregationBuffer) = {
      buffer.update(0, ArrayBuffer.empty[String])
    }

    def update(buffer: MutableAggregationBuffer, input: Row) = {
      if (!input.isNullAt(0)) 
        buffer.update(0, buffer.getSeq[String](0) :+ input.getString(0))
    }

    def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
      buffer1.update(0, buffer1.getSeq[String](0) ++ buffer2.getSeq[String](0))
    }

    def evaluate(buffer: Row) = UTF8String.fromString(
      buffer.getSeq[String](0).mkString(","))
}

示例用法:

val df = sc.parallelize(Seq(
  ("username1", "friend1"),
  ("username1", "friend2"),
  ("username2", "friend1"),
  ("username2", "friend3")
)).toDF("username", "friend")

df.groupBy($"username").agg(GroupConcat($"friend")).show

## +---------+---------------+
## | username|        friends|
## +---------+---------------+
## |username1|friend1,friend2|
## |username2|friend1,friend3|
## +---------+---------------+

您还可以创建一个 Python 包装器,如 Spark: How to map Python with Scala or Java User Defined Functions? 所示

在实践中,提取 RDD、groupByKeymkString 并重建 DataFrame 会更快。

collect_list函数(Spark >= 1.6.0)与concat_ws结合可以得到类似的效果:

import org.apache.spark.sql.functions.{collect_list, udf, lit}

df.groupBy($"username")
  .agg(concat_ws(",", collect_list($"friend")).alias("friends"))

【讨论】:

  • 如果我想在 SQL 中使用它怎么办 我如何在 Spark SQL 中注册这个 UDF?
  • @MurtazaKanchwala There is register method which accepts UDAFS 所以它应该作为标准 UDF 工作。
  • @zero323 在 spark sql 1.4.1 中执行相同操作的任何方法
  • 你不能在评估函数中删除`UTF8String.fromString()`吗?
  • 这是一个很好的解决方案。经过几次修改后,我尝试了它并且工作正常除了我遇到了与生成的 DF 的兼容性问题。如果没有 UTF 异常,我无法将生成的列与其他列进行比较。我改为将 DF 转换为 RDD;做我想做的事,然后将其转换回 DF。这解决了所有问题,此外,解决方案的速度提高了 10 倍。我认为可以肯定地说应该尽可能避免使用udfs
【解决方案2】:

你可以试试collect_list函数

sqlContext.sql("select A, collect_list(B), collect_list(C) from Table1 group by A

或者你可以注册一个类似的UDF

sqlContext.udf.register("myzip",(a:Long,b:Long)=>(a+","+b))

你可以在查询中使用这个函数

sqlConttext.sql("select A,collect_list(myzip(B,C)) from tbl group by A")

【讨论】:

【解决方案3】:

在 Spark 2.4+ 中,在 collect_list()array_join() 的帮助下,这变得更简单了。

这是 PySpark 中的一个演示,虽然代码在 Scala 中也应该非常相似:

from pyspark.sql.functions import array_join, collect_list

friends = spark.createDataFrame(
    [
        ('jacques', 'nicolas'),
        ('jacques', 'georges'),
        ('jacques', 'francois'),
        ('bob', 'amelie'),
        ('bob', 'zoe'),
    ],
    schema=['username', 'friend'],
)

(
    friends
    .orderBy('friend', ascending=False)
    .groupBy('username')
    .agg(
        array_join(
            collect_list('friend'),
            delimiter=', ',
        ).alias('friends')
    )
    .show(truncate=False)
)

输出:

+--------+--------------------------+
|username|friends                   |
+--------+--------------------------+
|jacques |nicolas, georges, francois|
|bob     |zoe, amelie               |
+--------+--------------------------+

这类似于 MySQL 的 GROUP_CONCAT() 和 Redshift 的 LISTAGG()

【讨论】:

    【解决方案4】:

    这是您可以在 PySpark 中使用的函数:

    import pyspark.sql.functions as F
    
    def group_concat(col, distinct=False, sep=','):
        if distinct:
            collect = F.collect_set(col.cast(StringType()))
        else:
            collect = F.collect_list(col.cast(StringType()))
        return F.concat_ws(sep, collect)
    
    
    table.groupby('username').agg(F.group_concat('friends').alias('friends'))
    

    在 SQL 中:

    select username, concat_ws(',', collect_list(friends)) as friends
    from table
    group by username
    

    【讨论】:

      【解决方案5】:

      使用 pyspark

      byUsername = df.rdd.reduceByKey(lambda x, y: x + ", " + y)
      

      如果您想再次将其设为数据框:

      sqlContext.createDataFrame(byUsername, ["username", "friends"])
      

      从 1.6 开始,您可以使用 collect_list 然后加入创建的列表:

      from pyspark.sql import functions as F
      from pyspark.sql.types import StringType
      join_ = F.udf(lambda x: ", ".join(x), StringType())
      df.groupBy("username").agg(join_(F.collect_list("friend").alias("friends"))
      

      【讨论】:

        【解决方案6】:

        语言:Scala Spark 版本:1.5.2

        我遇到了同样的问题,也尝试使用udfs 解决它,但不幸的是,由于类型不一致,这导致代码后面出现更多问题。我能够通过首先将DF 转换为RDD 然后分组 并以所需的方式操作数据然后将RDD 转换回@ 987654325@如下:

        val df = sc
             .parallelize(Seq(
                ("username1", "friend1"),
                ("username1", "friend2"),
                ("username2", "friend1"),
                ("username2", "friend3")))
             .toDF("username", "friend")
        
        +---------+-------+
        | username| friend|
        +---------+-------+
        |username1|friend1|
        |username1|friend2|
        |username2|friend1|
        |username2|friend3|
        +---------+-------+
        
        val dfGRPD = df.map(Row => (Row(0), Row(1)))
             .groupByKey()
             .map{ case(username:String, groupOfFriends:Iterable[String]) => (username, groupOfFriends.mkString(","))}
             .toDF("username", "groupOfFriends")
        
        +---------+---------------+
        | username| groupOfFriends|
        +---------+---------------+
        |username1|friend2,friend1|
        |username2|friend3,friend1|
        +---------+---------------+
        

        【讨论】:

          【解决方案7】:

          -- 使用 collect_set 的 spark SQL 解析

          SELECT id, concat_ws(', ', sort_array( collect_set(colors))) as csv_colors
          FROM ( 
            VALUES ('A', 'green'),('A','yellow'),('B', 'blue'),('B','green') 
          ) as T (id, colors)
          GROUP BY id
          

          【讨论】:

            【解决方案8】:

            下面是实现 group_concat 功能的基于 python 的代码。

            输入数据:

            Cust_No,Cust_Cars

            1,丰田

            2、宝马

            1、奥迪

            2、现代

            from pyspark.sql import SparkSession
            from pyspark.sql.types import StringType
            from pyspark.sql.functions import udf
            import pyspark.sql.functions as F
            
            spark = SparkSession.builder.master('yarn').getOrCreate()
            
            # Udf to join all list elements with "|"
            def combine_cars(car_list,sep='|'):
              collect = sep.join(car_list)
              return collect
            
            test_udf = udf(combine_cars,StringType())
            car_list_per_customer.groupBy("Cust_No").agg(F.collect_list("Cust_Cars").alias("car_list")).select("Cust_No",test_udf("car_list").alias("Final_List")).show(20,False)
            

            输出数据: Cust_No,Final_List

            1、丰田|奥迪

            2、宝马|现代

            【讨论】:

              【解决方案9】:

              您还可以使用 Spark SQL 函数 collect_list,之后您需要转换为字符串并使用函数 regexp_replace 替换特殊字符。

              regexp_replace(regexp_replace(regexp_replace(cast(collect_list((column)) as string), ' ', ''), ',', '|'), '[^A-Z0-9|]', '')
              

              这是一种更简单的方法。

              【讨论】:

                【解决方案10】:

                高阶函数 concat_ws()collect_list() 可以与 groupBy()

                一起成为一个不错的选择
                import pyspark.sql.functions as F
                    
                df_grp = df.groupby("agg_col").agg(F.concat_ws("#;", F.collect_list(df.time)).alias("time"), F.concat_ws("#;", F.collect_list(df.status)).alias("status"), F.concat_ws("#;", F.collect_list(df.llamaType)).alias("llamaType"))
                

                样本输出

                +-------+------------------+----------------+---------------------+
                |agg_col|time              |status          |llamaType            |
                +-------+------------------+----------------+---------------------+
                |1      |5-1-2020#;6-2-2020|Running#;Sitting|red llama#;blue llama|
                +-------+------------------+----------------+---------------------+
                

                【讨论】:

                  猜你喜欢
                  • 1970-01-01
                  • 2015-10-15
                  • 1970-01-01
                  • 2021-04-07
                  • 2011-12-12
                  • 1970-01-01
                  • 2021-12-31
                  • 2018-01-11
                  相关资源
                  最近更新 更多