谁能帮我解释它是如何工作的并给我一个输出
(10,4)
使用聚合时,提供三个参数:
- 从分区中累积元素的初始值,通常是中性元素
给定分区的函数,将在其中累积结果
合并两个分区的函数
因此,在您的情况下,分区的初始值是元组 (0, 0)。
然后,您定义的累加器函数会将您正在遍历的当前元素与元组的第一个元素相加,并将元组的第二个元素加一。实际上,它会计算分区中元素的总和及其元素数。
combiner 函数合并了两个元组。正如您定义的那样,它将求和并计算 2 个分区的元素数。在您的情况下未使用它,因为您按顺序遍历管道。您可以在 List 上调用 .par,以便获得并行实现以查看组合器的运行情况(注意它必须是关联函数)。
因此你得到 (10, 4) 因为 1+2+3+4=10 并且列表中有 4 个元素(你做了 4 次添加)。
您可以在累加器函数中添加一个打印语句(在顺序输入上运行),以查看它的行为:
Acc: (0,0) - value:1
Acc: (1,1) - value:2
Acc: (3,2) - value:3
Acc: (6,3) - value:4
我知道普通聚合在 scala 中的工作原理及其在折叠上的使用。
对于顺序输入,aggregate 是 foldLeft:
def aggregate[B](z: =>B)(seqop: (B, A) => B, combop: (B, B) => B): B = foldLeft(z)(seqop)
对于并行输入,列表被分成块,以便多个线程可以单独工作。累加器函数在每个块上运行,使用初始值。当两个线程需要合并它们的结果时,使用combine函数:
def aggregate[S](z: =>S)(seqop: (S, T) => S, combop: (S, S) => S): S = {
tasksupport.executeAndWaitResult(new Aggregate(() => z, seqop, combop, splitter))
}
这是 fork-join 模型的原理,但它要求您的任务可以很好地并行化。这里就是这种情况,因为一个线程不需要知道另一个线程的结果来完成它的工作。