将合并应用为seqop 和comboop 怎么样?
val in = Array(
Map("a" -> 1, "b" -> 2),
Map("a" -> 11, "c" -> 4),
Map("b" -> 7, "c" -> 10)
)
def merge(m1: Map[String, Int], m2: Map[String, Int]): Map[String, Int] =
m1 ++ m2.map { case (k, v) => k -> (v + m1.getOrElse(k, 0)) }
in.par.aggregate(Map[String, Int]())(merge, merge)
更新
您传递给aggregate 初始累加器值(空映射)和两个闭包 - seqop 和 comboop。
并行序列分成几个分区进行并行处理。通过将seqop 依次应用于累加器和数组元素来处理每个分区。
def seqop(
accumulator: Map[String, Int],
element: Map[String, Int]): Map[String, Int] = merge(accumulator, element)
seqop 获取初始累加器值和第一个数组元素并将其合并。接下来它需要上一个结果和下一个数组元素,依此类推,直到整个分区合并到一个映射中。
当每个分区合并到一个单独的映射中时,这些映射应该通过应用comboop 进行组合。 comboop 从第一个分区获取合并映射,从第二个分区获取合并映射并将其合并在一起。接下来,它从第三个分区获取先前的结果和映射,依此类推,直到所有都合并到一个映射中。这是aggregate 的结果。
def comboop(
m1: Map[String, Int],
m2: Map[String, Int]): Map[String, Int] = merge(m1, m2)
seqop 和 comboop 相同只是巧合。一般来说,它们在逻辑和签名上有所不同。