【问题标题】:Spark reduce and aggregate on same data-setSpark 在同一数据集上减少和聚合
【发布时间】:2016-10-04 16:39:10
【问题描述】:

我有一个文本文件,我阅读它然后使用split 操作进行拆分。这会产生一个带有Array(A, B, C, D, E, F, G, H, I) 的RDD。

我想为每个键 E 找到 max(F) - min(G)(通过键 E 减少)。然后我想通过键 C 组合得到的值,并用相同的键连接每一行的总和结果。

例如:

+--+--+--+--+
| C| E| F| G|
+--+--+--+--+
|en| 1| 3| 1|
|en| 1| 4| 0|
|nl| 2| 1| 1|
|nl| 2| 5| 2|
|nl| 3| 9| 3|
|nl| 3| 6| 4|
|en| 4| 9| 1|
|en| 4| 2| 1|
+-----------+

应该会导致

+--+--+-------------+---+
| C| E|max(F)-min(G)|sum|
+--+--+-------------+---+
|en| 1| 4           |12 |
|nl| 2| 4           |10 |
|nl| 3| 6           |10 |
|en| 4| 8           |12 |
+--+--+-------------+---+

解决这个问题的最佳方法是什么?目前我正在尝试通过运行来执行max(F)-min(G)

val maxCounts = logEntries.map(line => (line(4), line(5).toLong)).reduceByKey((x, y) => math.max(x, y))
val minCounts = logEntries.map(line => (line(4), line(6).toLong)).reduceByKey((x, y) => math.min(x, y))

val maxMinCounts = maxCounts.join(minCounts).map{ case(id, maxmin) => (id, (maxmin._1 - maxmin._2)) }

然后join 生成的 RDD。但是,当我还想对这些值求和并将它们附加到我现有的数据集中时,这变得很棘手。

我很想听听任何建议!

【问题讨论】:

  • 使用 Spark SQL DataFrame 很容易,您可以将 RDD 转换为 DataFrame 并执行所有聚合操作.. 试试这个链接 stackoverflow.com/questions/33882894/…
  • 为什么不将math.maxmath.min 合并到同一个RDD 中?

标签: scala apache-spark aggregate-functions


【解决方案1】:

这种逻辑很容易在 dataframe API 中实现(也)。但是你需要从数组中明确地形成你的列:

val window = Window.partitionBy('C)

val df = rdd
  .map { case Array(_, _, c, _, e, f, g, _, _) => (c,e,f,g) }
  .toDF("C","E","F","G")
  .groupBy('C,'E)
  .agg((max('F) - min('G)).as("diff"))
  .withColumn("sum",sum('diff).over(window))   

【讨论】:

  • 感谢您的建议。我将sum(diff) 更改为sum('diff),将Window.partitionBy('v) 更改为Window.partitionBy('C),否则会导致错误。但是,当我尝试运行此代码时,出现以下错误:scala.MatchError: [Ljava.lang.String;@f908897 (of class [Ljava.lang.String;)。使用Window.partitionBy('v) 导致org.apache.spark.sql.AnalysisException: cannot resolve '`v`' given input columns: [C, E, diff];
  • 我通过使用 Spark CSV 阅读器而不是直接读取输入文件解决了这个问题。问题可能与输入中的特殊字符有关。
【解决方案2】:

假设,就像您的示例数据一样,唯一的 E 永远不会跨越多个 C...您可以这样做。

import math.{max,min}

case class FG(f: Int, g: Int) {
  def combine(that: FG) =
    FG(max(f, that.f), min(g, that.g))
  def result = f - g 
}

val result = {
  rdd
  .map{ case Array(_, _, c, _, e, f, g, _, _) => 
    ((c, e), FG(f, g)) }
  .reduceByKey(_ combine _)
  .map{ case ((c, _), fg) =>
    (c, fg.result) }
  .reduceByKey(_+_)  
}

【讨论】:

    猜你喜欢
    • 2017-08-25
    • 1970-01-01
    • 2019-12-22
    • 2019-08-09
    • 1970-01-01
    • 2018-09-30
    • 1970-01-01
    • 2021-06-14
    • 2016-07-27
    相关资源
    最近更新 更多