【问题标题】:Calculating sum,count of multiple top K values spark计算总和,多个前 K 值的计数火花
【发布时间】:2017-08-13 12:52:27
【问题描述】:

我有一个格式的输入数据框

+---------------------------------+
|name| values |score    |row_number|
+---------------------------------+
|A    |1000   |0        |1        |
|B    |947    |0        |2        |
|C    |923    |1        |3        |
|D    |900    |2        |4        |
|E    |850    |3        |5        |
|F    |800    |1        |6        |
+---------------------------------+

当 score > 0 和 row_number 0 时,我需要获取 sum(values) 和所有值的总和。

我可以通过对前 100 个值运行以下查询来实现这一点

val top_100_data = df.select(
      count(when(col("score") > 0 and col("row_number")<=100, col("values"))).alias("count_100"),
      sum(when(col("score") > 0 and col("row_number")<=100, col("values"))).alias("sum_filtered_100"),
      sum(when(col("row_number") <=100, col(values))).alias("total_sum_100")
    )

但是,我需要获取前 100,200,300 ......2500 个的数据。这意味着我需要运行此查询 25 次,最后合并 25 个数据帧。

我是 spark 新手,但我仍然在搞清楚很多事情。解决这个问题的最佳方法是什么?

谢谢!!

【问题讨论】:

    标签: apache-spark apache-spark-sql apache-spark-dataset


    【解决方案1】:

    您可以创建一个Array 的限制为

    val topFilters = Array(100, 200, 300) // you can add more
    

    然后您可以遍历topFilters 数组并创建您需要的dataframe我建议你使用join 而不是union,因为join 会给你单独的columnsunions 会给你单独的rows。您可以执行以下操作

    鉴于你的dataframe

    +----+------+-----+----------+
    |name|values|score|row_number|
    +----+------+-----+----------+
    |A   |1000  |0    |1         |
    |B   |947   |0    |2         |
    |C   |923   |1    |3         |
    |D   |900   |2    |200       |
    |E   |850   |3    |150       |
    |F   |800   |1    |250       |
    +----+------+-----+----------+
    

    您可以使用上面定义的topFilters 数组

    import sqlContext.implicits._
    import org.apache.spark.sql.functions._
    var finalDF : DataFrame = Seq("1").toDF("rowNum")
    for(k <- topFilters) {
      val top_100_data = df.select(lit("1").as("rowNum"), sum(when(col("score") > 0 && col("row_number") < k, col("values"))).alias(s"total_sum_$k"))
      finalDF = finalDF.join(top_100_data, Seq("rowNum"))
    }
    finalDF.show(false)
    

    哪个应该给你最终的dataframe

    +------+-------------+-------------+-------------+
    |rowNum|total_sum_100|total_sum_200|total_sum_300|
    +------+-------------+-------------+-------------+
    |1     |923          |1773         |3473         |
    +------+-------------+-------------+-------------+
    

    您可以对您拥有的 25 个限制执行相同的操作。

    如果你打算使用union,那么思路同上。

    希望回答对你有帮助

    更新

    如果您需要联合,那么您可以使用上面定义的相同限制数组应用以下逻辑

    var finalDF : DataFrame = Seq((0, 0, 0, 0)).toDF("limit", "count", "sum_filtered", "total_sum")
    for(k <- topFilters) {
      val top_100_data = df.select(lit(k).as("limit"), count(when(col("score") > 0 and col("row_number")<=k, col("values"))).alias("count"),
        sum(when(col("score") > 0 and col("row_number")<=k, col("values"))).alias("sum_filtered"),
        sum(when(col("row_number") <=k, col("values"))).alias("total_sum"))
      finalDF = finalDF.union(top_100_data)
    }
    finalDF.filter(col("limit") =!= 0).show(false)
    

    这应该给你

    +-----+-----+------------+---------+
    |limit|count|sum_filtered|total_sum|
    +-----+-----+------------+---------+
    |100  |1    |923         |2870     |
    |200  |3    |2673        |4620     |
    |300  |4    |3473        |5420     |
    +-----+-----+------------+---------+
    

    【讨论】:

    • 嗨!感谢您的回答,这非常有帮助!所以我需要每 K 3 列(sum_100_filtered_score,total_sum_100,count_filtered_score_100)。通过加入数据集,我得到每个字段的一列。这就是我尝试使用联合的原因
    • 查看我的更新答案 :) 如果它真的对你有帮助,你可以接受并投票
    猜你喜欢
    • 2014-09-16
    • 1970-01-01
    • 1970-01-01
    • 2014-05-18
    • 1970-01-01
    • 1970-01-01
    • 2018-02-15
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多