【问题标题】:How does Spark aggregate function - aggregateByKey work?Spark 聚合函数 - aggregateByKey 是如何工作的?
【发布时间】:2014-09-08 09:19:25
【问题描述】:

假设我在 3 个节点上有一个分布式系统,我的数据分布在这些节点之间。例如,我有一个 test.csv 文件,它存在于所有 3 个节点上,它包含 2 列:

**row   | id,  c.**
---------------
row1  | k1 , c1  
row2  | k1 , c2  
row3  | k1 , c3  
row4  | k2 , c4  
row5  | k2 , c5  
row6  | k2 , c6  
row7  | k3 , c7  
row8  | k3 , c8  
row9  | k3 , c9  
row10 | k4 , c10   
row11 | k4 , c11  
row12 | k4 , c12 

然后我使用 SparkContext.textFile 将文件读取为 rdd 等。据我了解,每个火花工作节点都会从文件中读取 a 部分。所以现在假设每个节点都将存储:

  • 节点 1:第 1~4 行
  • 节点 2:第 5~8 行
  • 节点 3:第 9~12 行

我的问题是,假设我想对这些数据进行计算,并且需要将键组合在一起的一个步骤,因此键值对将是 [k1 [{k1 c1} {k1 c2} {k1 c3}]].. 等等。

有一个函数叫groupByKey(),使用起来很贵,推荐使用aggregateByKey()。所以我想知道groupByKey()aggregateByKey() 如何在幕后工作?有人可以使用我上面提供的示例进行解释吗?洗牌后,行在每个节点上的位置?

【问题讨论】:

    标签: apache-spark distributed-computing


    【解决方案1】:

    aggregateByKey() 几乎与reduceByKey() 相同(两者都在后台调用combineByKey()),除了您为aggregateByKey() 指定一个起始值。大多数人都熟悉reduceByKey(),所以我将在解释中使用它。

    reduceByKey() 更好的原因是因为它利用了称为组合器的 MapReduce 功能。像+* 这样的任何函数都可以以这种方式使用,因为调用它的元素的顺序无关紧要。这允许 Spark 使用相同的键开始“减少”值,即使它们尚未全部位于同一分区中。

    另一方面,groupByKey() 为您提供了更多功能,因为您编写了一个接受 Iterable 的函数,这意味着您甚至可以将所有元素拉入一个数组中。然而,它的效率很低,因为它必须在一个分区中才能工作完整的(K,V,) 对。

    在 reduce 类型操作中移动数据的步骤通常称为 shuffle,在最简单的级别上,数据被分区到每个节点(通常使用哈希分区器),然后在每个节点上排序。

    【讨论】:

    • 好的,让我们回到我的例子,如果 node1 有 row1~row3,node2 有 row4~row6,node3 有 row7 到 row12。当我做groupByKey时,数据会完全移动还是没有移动,因为具有相同键的rdd已经在同一个节点上?谢谢
    • @EdwinGuo 不,数据仍然可以移动,假设您正在使用哈希分区器,如果所有 k1 都在节点 1 上但 k1 的哈希分区器结果为 3,它仍然会转到第三个节点
    • 但是如果我不关心顺序,我只想返回一个包含所有值的数组,就像 groupByKey 一样。是否可以使用除 groupbykey 之外的其他语法?
    • @AdrianoAlmeida 如果您甚至不想将相同的键放入同一个数组中,您可以使用 glom
    【解决方案2】:

    aggregateByKey() 与 reduceByKey 完全不同。发生的情况是 reduceByKey 是 aggregateByKey 的一种特殊情况。

    aggregateByKey() 将组合特定键的值,这种组合的结果可以是您指定的任何对象。您必须指定如何在一个分区内组合(“添加”)值(在同一节点中执行)以及如何组合来自不同分区(可能在不同节点中)的结果。 reduceByKey 是一种特殊情况,从某种意义上说,组合的结果(例如求和)与值的类型相同,并且从不同分区组合时的操作也与组合内部值时的操作相同分区。

    一个例子: 想象一下,您有一个配对列表。你并行化它:

    val pairs = sc.parallelize(Array(("a", 3), ("a", 1), ("b", 7), ("a", 5)))
    

    现在您想通过键“组合”它们产生一个总和。在这种情况下 reduceByKey 和 aggregateByKey 是一样的:

    val resReduce = pairs.reduceByKey(_ + _) //the same operation for everything
    resReduce.collect
    res3: Array[(String, Int)] = Array((b,7), (a,9))
    
    //0 is initial value, _+_ inside partition, _+_ between partitions
    val resAgg = pairs.aggregateByKey(0)(_+_,_+_)
    resAgg.collect
    res4: Array[(String, Int)] = Array((b,7), (a,9))
    

    现在,假设您希望聚合是一组值,这是与整数不同的类型(整数的总和也是整数):

    import scala.collection.mutable.HashSet
    //the initial value is a void Set. Adding an element to a set is the first
    //_+_ Join two sets is the  _++_
    val sets = pairs.aggregateByKey(new HashSet[Int])(_+_, _++_)
    sets.collect
    res5: Array[(String, scala.collection.mutable.HashSet[Int])]  =Array((b,Set(7)), (a,Set(1, 5, 3)))
    

    【讨论】:

    • 对这两者如何工作的非常彻底的回答,不胜感激!
    • 能不能也发个java代码,scala看不懂
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-04-07
    • 1970-01-01
    • 1970-01-01
    • 2018-01-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多