【问题标题】:Aggregating multiple columns with custom function in Spark在 Spark 中使用自定义函数聚合多个列
【发布时间】:2016-10-10 19:23:27
【问题描述】:

我想知道是否有某种方法可以为多列上的 spark 数据帧指定自定义聚合函数。

我有一张这样的表格(名称、商品、价格):

john | tomato | 1.99
john | carrot | 0.45
bill | apple  | 0.99
john | banana | 1.29
bill | taco   | 2.59

到:

我想将项目和每个人的成本汇总到这样的列表中:

john | (tomato, 1.99), (carrot, 0.45), (banana, 1.29)
bill | (apple, 0.99), (taco, 2.59)

这在数据帧中可行吗?我最近了解了collect_list,但它似乎只适用于一列。

【问题讨论】:

    标签: scala apache-spark dataframe apache-spark-sql orc


    【解决方案1】:

    考虑在收集为列表之前使用struct 函数将列组合在一起:

    import org.apache.spark.sql.functions.{collect_list, struct}
    import sqlContext.implicits._
    
    val df = Seq(
      ("john", "tomato", 1.99),
      ("john", "carrot", 0.45),
      ("bill", "apple", 0.99),
      ("john", "banana", 1.29),
      ("bill", "taco", 2.59)
    ).toDF("name", "food", "price")
    
    df.groupBy($"name")
      .agg(collect_list(struct($"food", $"price")).as("foods"))
      .show(false)
    

    输出:

    +----+---------------------------------------------+
    |name|foods                                        |
    +----+---------------------------------------------+
    |john|[[tomato,1.99], [carrot,0.45], [banana,1.29]]|
    |bill|[[apple,0.99], [taco,2.59]]                  |
    +----+---------------------------------------------+
    

    【讨论】:

    • 我想提一下,这种方法看起来比接受的答案更干净,但不幸的是不适用于 spark 1.6,因为collect_list() 不接受结构。
    • 在 Spark 2.1 中工作
    【解决方案2】:

    DataFrame 执行此操作的最简单方法是首先收集两个列表,然后将两个列表一起使用UDFzip。比如:

    import org.apache.spark.sql.functions.{collect_list, udf}
    import sqlContext.implicits._
    
    val zipper = udf[Seq[(String, Double)], Seq[String], Seq[Double]](_.zip(_))
    
    val df = Seq(
      ("john", "tomato", 1.99),
      ("john", "carrot", 0.45),
      ("bill", "apple", 0.99),
      ("john", "banana", 1.29),
      ("bill", "taco", 2.59)
    ).toDF("name", "food", "price")
    
    val df2 = df.groupBy("name").agg(
      collect_list(col("food")) as "food",
      collect_list(col("price")) as "price" 
    ).withColumn("food", zipper(col("food"), col("price"))).drop("price")
    
    df2.show(false)
    # +----+---------------------------------------------+
    # |name|food                                         |
    # +----+---------------------------------------------+
    # |john|[[tomato,1.99], [carrot,0.45], [banana,1.29]]|
    # |bill|[[apple,0.99], [taco,2.59]]                  |
    # +----+---------------------------------------------+
    

    【讨论】:

    • 我使用 col(...) 而不是 $"..." 是有原因的——我发现 col(...)class 定义之类的内容中工作量较少。
    • 是否有任何函数可以重新对齐列,例如在 zip 函数中告诉它首先从列的尾部添加一个元素,然后从头部删除一个元素,然后压缩它们?在这种情况下,例如,如果您每天阅读价格并且有一个时间列,那么您可以获得商品的下一个价格。
    • 答案假设(也许是正确的)collect_list() 将保留食物和价格两列的元素顺序。这意味着同一行的食物和价格将在两个收集的列表中以相同的索引结束。这种订单保留行为是否得到保证? (这是有道理的,但我不确定通过查看 collect_list 的 scala 代码,而不是 scala 程序员)。
    • Afaik,不能保证元素的顺序是相同的。参考:stackoverflow.com/questions/40407514/…
    • 我使用此解决方案的变体将五个列表压缩在一起。这让我有机会编写迄今为止我职业生涯中最好的代码行:_ zip _ zip _ zip _ zip _
    【解决方案3】:

    也许比 zip 函数更好的方法(因为 UDF 和 UDAF 对性能非常不利)是将这两列包装到 Struct 中。

    这可能也可以:

    df.select('name, struct('food, 'price).as("tuple"))
      .groupBy('name)
      .agg(collect_list('tuple).as("tuples"))
    

    【讨论】:

      【解决方案4】:

      您的观点 collect_list 似乎只适用于一列:为了让 collect_list 在多列上工作,您必须将您想要的列作为聚合包装在一个结构中。 例如:

           val aggregatedData = df.groupBy("name").agg(collect_list(struct("item", "price")) as("food"))
      
           aggregatedData.show
      +----+------------------------------------------------+
      |name|foods                                           |
      +----+------------------------------------------------+
      |john|[[tomato, 1.99], [carrot, 0.45], [banana, 1.29]]|
      |bill|[[apple, 0.99], [taco, 2.59]]                   |
      +----+------------------------------------------------+
      

      【讨论】:

        【解决方案5】:

        这是一个选项,将数据框转换为 Map 的 RDD,然后在其上调用 groupByKey。结果将是一个键值对列表,其中 value 是一个元组列表。

        df.show
        +----+------+----+
        |  _1|    _2|  _3|
        +----+------+----+
        |john|tomato|1.99|
        |john|carrot|0.45|
        |bill| apple|0.99|
        |john|banana|1.29|
        |bill|  taco|2.59|
        +----+------+----+
        
        
        val tuples = df.map(row => row(0) -> (row(1), row(2)))
        tuples: org.apache.spark.rdd.RDD[(Any, (Any, Any))] = MapPartitionsRDD[102] at map at <console>:43
        
        tuples.groupByKey().map{ case(x, y) => (x, y.toList) }.collect
        res76: Array[(Any, List[(Any, Any)])] = Array((bill,List((apple,0.99), (taco,2.59))), (john,List((tomato,1.99), (carrot,0.45), (banana,1.29))))
        

        【讨论】:

          猜你喜欢
          • 2017-05-19
          • 1970-01-01
          • 2018-03-30
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多