我将解释Spark中Aggregate操作的概念如下:
聚合函数的定义
**def aggregate** (initial value)(an intra-partition sequence operation)(an inter-partition combination operation)
val flowers = sc.parallelize(List(11, 12, 13, 24, 25, 26, 35, 36, 37, 24, 25, 16), 4) --> 4 表示我们的 Spark 集群中可用的分区数。
因此,rdd 被分为 4 个分区:
11, 12, 13
24, 25, 26
35, 36, 37
24, 25, 16
我们将问题陈述分为两部分:
问题的第一部分是汇总每个象限中采摘的花朵总数;这就是分区内序列聚合
11+12+13 = 36
24+25+26 = 75
35+36+37 = 108
24+25 +16 = 65
问题的第二部分是将这些单独的聚合跨分区求和;这就是分区间聚合。
36 + 75 + 108 + 65 = 284
存储在 RDD 中的总和可以进一步用于任何类型的转换或其他操作
所以代码变成这样:
val sum = flowers.aggregate(0)((acc, value) => (acc + value), (x,y) => (x+y)) 或
val sum = flowers.aggregate(0)(_+_, _+_)
Answer: 284
解释: (0) - 是累加器
第一个+是分区内总和,加上花园每个象限中每个采摘者采摘的花朵总数。
第二个+是分区间总和,它聚合了每个象限的总和。
案例 1:
假设,如果我们需要在初始值之后减少函数。如果初始值不为零会怎样?如果是 4,例如:
该数字将添加到每个分区内聚合以及分区间聚合:
所以第一个计算是:
11+12+13 = 36 + 5 = 41
24+25+26 = 75 + 5 = 80
35+36+37 = 108 + 5 = 113
24+25 +16 = 65 + 5 = 70
这里是初始值为5的分区间聚合计算:
partition1 + partition2 + partition3+ partition4 + 5 = 41 + 80 + 113 + 70 = 309
因此,进入您的查询:总和可以根据 rdd 数据分布的分区数来计算。我认为您的数据分布如下,这就是为什么您的结果为 (19, 4)。因此,在进行聚合操作时,请指定分区值的数量:
val list = sc.parallelize(List(1,2,3,4))
val list2 = list.glom().collect
val res12 = list.aggregate((1,0))(
(acc, value) => (acc._1 + value, acc._2 + 1),
(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)
结果:
list: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[19] at parallelize at command-472682101230301:1
list2: Array[Array[Int]] = Array(Array(), Array(1), Array(), Array(2), Array(), Array(3), Array(), Array(4))
res12: (Int, Int) = (19,4)
解释:因为你的数据分布在8个分区,结果是这样的(使用上面解释的逻辑)
分区内加法:
0+1=1
1+1=2
0+1=1
2+1=3
0+1=1
3+1=4
0+1=1
4+1=5
total=18
分区间计算:
18+1 (1+2+1+3+1+4+1+5+1) = 19
谢谢