【问题标题】:Example of the Scala aggregate functionScala 聚合函数的示例
【发布时间】:2011-10-19 04:38:35
【问题描述】:

我一直在寻找,但找不到我能理解的 Scala 中 aggregate 函数的示例或讨论。它看起来很强大。

这个函数可以用来减少元组的值来创建一个多图类型的集合吗?例如:

val list = Seq(("one", "i"), ("two", "2"), ("two", "ii"), ("one", "1"), ("four", "iv"))

应用聚合后:

Seq(("one" -> Seq("i","1")), ("two" -> Seq("2", "ii")), ("four" -> Seq("iv"))

另外,你能给出参数zsegopcombop的例子吗?我不清楚这些参数的作用。

【问题讨论】:

    标签: scala aggregate-functions


    【解决方案1】:

    让我们看看一些 ascii 艺术是否没有帮助。考虑aggregate的类型签名:

    def aggregate [B] (z: B)(seqop: (B, A) ⇒ B, combop: (B, B) ⇒ B): B
    

    另外,请注意A 指的是集合的类型。所以,假设我们在这个集合中有 4 个元素,那么 aggregate 可能会像这样工作:

    z   A   z   A   z   A   z   A
     \ /     \ /seqop\ /     \ /    
      B       B       B       B
        \   /  combop   \   /
          B _           _ B
             \ combop  /
                  B
    

    让我们看一个实际的例子。假设我有一个GenSeq("This", "is", "an", "example"),我想知道其中有多少个字符。我可以写如下:

    注意在下面的 sn-p 代码中使用了par。传递给聚合的第二个函数是在计算单个序列之后调用的函数。 Scala 只能对可以并行化的集合执行此操作。

    import scala.collection.GenSeq
    val seq = GenSeq("This", "is", "an", "example")
    val chars = seq.par.aggregate(0)(_ + _.length, _ + _)
    

    所以,首先它会计算:

    0 + "This".length     // 4
    0 + "is".length       // 2
    0 + "an".length       // 2
    0 + "example".length  // 7
    

    它接下来会做什么无法预测(组合结果的方法不止一种),但它可能会这样做(就像上面的 ascii 艺术):

    4 + 2 // 6
    2 + 7 // 9
    

    在什么时候结束

    6 + 9 // 15
    

    给出最终结果。现在,这在结构上与foldLeft 有点相似,但它有一个额外的功能(B, B) => B,这是 fold 没有的。但是,此功能使其能够并行工作!

    例如,考虑一下,四个初始计算中的每一个都是相互独立的,并且可以并行完成。接下来的两个(导致 6 和 9)可以在它们所依赖的计算完成后启动,但是这两个可以并行运行。

    如上所述并行化的 7 次计算只需同时进行 3 次串行计算。

    实际上,对于如此小的集合,同步计算的成本将足以抵消任何收益。此外,如果你折叠它,总共只需要 4 次计算。但是,一旦您的收藏变得更大,您就会开始看到一些真正的收益。

    另一方面,考虑foldLeft。因为它没有附加功能,所以它不能并行任何计算:

    (((0 + "This".length) + "is".length) + "an".length) + "example".length
    

    必须先计算每个内括号,然后才能进行外括号。

    【讨论】:

    • 我们可以说这类似于map reduce,seqop扮演mapper函数,combop扮演reducer函数?我也是一个新手,并试图理解语义。感谢 ASCII 艺术,绝对有帮助!
    • 是的。树聚合结构是aggregate 存在的关键。
    • 这很令人困惑,因为在您的示例中 combop 从未被调用。你可以通过简单地为第二个参数做任何你想做的事情来看到这一点。例如返回数字 11242414,你​​仍然得到相同的答案 15。
    • 我对此进行了更多研究,发现从未调用 combop 函数,因为您使用的集合不可并行化。如果您在聚合之前调用par,它将确保调用组合。
    【解决方案2】:

    聚合函数不这样做(除了它是一个非常通用的函数,并且可以用来这样做)。你想要groupBy。至少接近。当您从Seq[(String, String)] 开始时,您通过获取元组中的第一项(即(String, String) => String),它将返回Map[String, Seq[(String, String)])进行分组。然后,您必须丢弃 Seq[String, String)] 值中的第一个参数。

    所以

    list.groupBy(_._1).mapValues(_.map(_._2))
    

    你会得到一个Map[String, Seq[(String, String)]。如果您想要 Seq 而不是 Map,请在结果上调用 toSeq。不过,我认为您不能保证生成的 Seq 中的顺序


    聚合是一个更难的函数。

    首先考虑 reduceLeft 和 reduceRight。 让asA 类型元素的非空序列as = Seq(a1, ... an),而f: (A,A) => A 是将A 类型的两个元素组合成一个的某种方式。我会将它记为二元运算符@a1 @ a2 而不是f(a1, a2)as.reduceLeft(@) 将计算 (((a1 @ a2) @ a3)... @ an)reduceRight 将括号放在相反的位置,(a1 @ (a2 @... @ an))))。如果@ 恰好是关联的,则不会关心括号。可以将其计算为(a1 @... @ ap) @ (ap+1 @...@an)(两个大括号内也会有括号,但我们不要在意)。然后可以并行执行这两个部分,而 reduceLeft 或 reduceRight 中的嵌套括号强制执行完全顺序计算。但是只有当@ 被认为是关联的并且reduceLeft 方法不知道这一点时才可能进行并行计算。

    仍然可以有方法reduce,其调用者将负责确保操作是关联的。然后reduce 将按照它认为合适的方式对调用进行排序,可能并行进行。确实有这样的方法。

    然而,各种 reduce 方法都有一个限制。 Seq 的元素只能组合成相同类型的结果:@ 必须是 (A,A) => A。但是一个更普遍的问题是将它们组合成一个B。一个以B 类型的值b 开始,并将其与序列的每个元素组合。运算符@(B,A) => B,计算(((b @ a1) @ a2) ... @ an)foldLeft 就是这样做的。 foldRight 做同样的事情,但从 an 开始。在那里,@ 操作没有机会关联。当一个人写b @ a1 @ a2 时,它一定意味着(b @ a1) @ a2,因为(a1 @ a2) 将是错误的类型。所以 foldLeft 和 foldRight 必须是顺序的。

    然而,假设每个A 都可以变成B,让我们用! 来写它,a! 的类型是B。此外假设有一个+ 操作(B,B) => B,并且@ 使得b @ a 实际上是b + a!。与其用@组合元素,不如先用!将它们全部转换为B,然后用+组合它们。那将是as.map(!).reduceLeft(+)。如果+ 是关联的,那么可以使用reduce 来完成,而不是顺序的:as.map(!).reduce(+)。可能有一个假设的方法 as.associativeFold(b, !, +)。

    Aggregate 非常接近这一点。然而,可能有一种比b+a! 更有效的方法来实现b@a 例如,如果类型BList[A],而b@a 是a::b,那么a! 将是a::Nilb1 + b2 将是 b2 ::: b1。 a::b 比 (a::Nil):::b 好得多。要从关联性中受益,但仍使用@,首先将b + a1! + ... + an! 拆分为(b + a1! + ap!) + (ap+1! + ..+ an!),然后返回使用@(b @ a1 @ an) + (ap+1! @ @ an)。一个仍然需要!在 ap+1 上,因为必须从一些 b 开始。 + 仍然是必要的,出现在括号之间。为此,可以将as.associativeFold(!, +) 更改为as.optimizedAssociativeFold(b, !, @, +)

    返回++ 是关联的,或者等效地,(B, +) 是一个半群。实际上,编程中使用的大多数半群恰好也是幺半群,即它们在 B 中包含一个中性元素 z(对于 ),因此对于每个 bz + b = b + z = b。在这种情况下,有意义的! 操作很可能是a! = z @ a。此外,z 是一个中性元素b @ a1 ..@ an = (b + z) @ a1 @ an,即b + (z + a1 @ an)。所以总是可以用 z 开始聚合。如果需要b,则在最后执行b + result。有了所有这些假设,我们可以做 as.aggregate(z, @, +)。这就是aggregate 所做的。 @seqop 参数(应用于序列 z @ a1 @ a2 @ ap),+combop(应用于已经部分合并的结果,如在(z + a1@...@ap) + (z + ap+1@...@an))。

    总而言之,as.aggregate(z)(seqop, combop) 计算与 as.foldLeft(z)( seqop) 相同的东西,前提是

    • (B, combop, z) 是一个幺半群
    • seqop(b,a) = combop(b, seqop(z,a))

    聚合实现可以使用组合的关联性来对计算进行分组(但是不交换元素,+ 不必是可交换的, ::: 不是)。它可以并行运行它们。

    最后,使用aggregate 解决最初的问题留给读者作为练习。提示:使用foldLeft 实现,然后找到满足上述条件的zcombo

    【讨论】:

      【解决方案3】:

      具有 A 类型元素的集合的签名是:

      def aggregate [B] (z: B)(seqop: (B, A) ⇒ B, combop: (B, B) ⇒ B): B 
      
      • z 是 B 类型的对象,充当中性元素。如果你想计算一些东西,你可以使用 0,如果你想构建一个列表,可以从一个空列表开始,等等。
      • segop 类似于您传递给 fold 方法的函数。它有两个参数,第一个与您传递的中性元素类型相同,表示在上一次迭代中已经聚合的内容,第二个是您集合的下一个元素。结果还必须是 B 类型。
      • combop: 是将两个结果合二为一的函数。

      在大多数集合中,聚合在TraversableOnce 中实现为:

        def aggregate[B](z: B)(seqop: (B, A) => B, combop: (B, B) => B): B 
          = foldLeft(z)(seqop)
      

      因此combop 被忽略。但是,对于并行集合是有意义的,因为seqop 将首先在本地并行应用,然后调用combop 来完成聚合。

      因此,对于您的示例,您可以先尝试折叠:

      val seqOp = 
        (map:Map[String,Set[String]],tuple: (String,String)) => 
          map + ( tuple._1 -> ( map.getOrElse( tuple._1, Set[String]() ) + tuple._2 ) )
      
      
      list.foldLeft( Map[String,Set[String]]() )( seqOp )
      // returns: Map(one -> Set(i, 1), two -> Set(2, ii), four -> Set(iv))
      

      那么你必须找到一种折叠两个多图的方法:

      val combOp = (map1: Map[String,Set[String]], map2: Map[String,Set[String]]) =>
             (map1.keySet ++ map2.keySet).foldLeft( Map[String,Set[String]]() ) { 
               (result,k) => 
                 result + ( k -> ( map1.getOrElse(k,Set[String]() ) ++ map2.getOrElse(k,Set[String]() ) ) ) 
             } 
      

      现在,您可以并行使用聚合:

      list.par.aggregate( Map[String,Set[String]]() )( seqOp, combOp )
      //Returns: Map(one -> Set(i, 1), two -> Set(2, ii), four -> Set(iv))
      

      将方法“par”应用于列表,从而使用列表的并行集合(scala.collection.parallel.immutable.ParSeq)来真正利用多核处理器。如果没有“par”,则不会有任何性能提升,因为聚合不是在并行集合上完成的。

      【讨论】:

        【解决方案4】:

        aggregate 类似于foldLeft,但可以并行执行。

        作为missingfactor saysaggregate(z)(seqop, combop) 的线性版本等价于foldleft(z)(seqop)。然而,这在并行情况下是不切实际的,在这种情况下,我们不仅需要将下一个元素与前一个结果结合起来(就像在正常折叠中一样),而且我们希望将可迭代对象拆分为子可迭代对象,我们将其称为聚合并需要再次结合这些。 (按从左到右的顺序,但不是关联的,因为我们可能已经在可迭代的第一个部分之前组合了最后一部分。)这种重新组合通常是不平凡的,因此,需要一种方法 (S, S) => S 来做到这一点。

        ParIterableLike中的定义是:

        def aggregate[S](z: S)(seqop: (S, T) => S, combop: (S, S) => S): S = {
          executeAndWaitResult(new Aggregate(z, seqop, combop, splitter))
        }
        

        确实使用了combop

        供参考,Aggregate 定义为:

        protected[this] class Aggregate[S](z: S, seqop: (S, T) => S, combop: (S, S) => S, protected[this] val pit: IterableSplitter[T])
          extends Accessor[S, Aggregate[S]] {
            @volatile var result: S = null.asInstanceOf[S]
            def leaf(prevr: Option[S]) = result = pit.foldLeft(z)(seqop)
            protected[this] def newSubtask(p: IterableSplitter[T]) = new Aggregate(z, seqop, combop, p)
            override def merge(that: Aggregate[S]) = result = combop(result, that.result)
        }
        

        重要的部分是merge,其中combop 应用了两个子结果。

        【讨论】:

        • 这个是一个真正“明白”afa aggregate及其在树状结构聚合中的有用性的答案。
        【解决方案5】:

        这里是关于聚合如何在具有基准的多核处理器上启用性能的博客。 http://markusjais.com/scalas-parallel-collections-and-the-aggregate-method/

        这是“Scala Days 2011”中有关“Scala 并行集合”演讲的视频。 http://days2011.scala-lang.org/node/138/272

        视频说明

        Scala 并行集合

        亚历山大·普罗科佩克

        随着处理器内核数量的增加,并行编程抽象变得越来越重要。高级编程模型使程序员能够更多地关注程序,而不是低级细节,例如同步和负载平衡。 Scala 并行集合扩展了 Scala 集合框架的编程模型,提供对数据集的并行操作。 演讲将描述并行收集框架的架构,解释它们的实现和设计决策。将描述具体的集合实现,例如并行哈希映射和并行哈希尝试。最后,将展示几个示例应用程序,在实践中演示编程模型。

        【讨论】:

          【解决方案6】:

          aggregateTraversableOnce源码中的定义是:

          def aggregate[B](z: B)(seqop: (B, A) => B, combop: (B, B) => B): B = 
            foldLeft(z)(seqop)
          

          这与简单的foldLeft 没有什么不同。 combop 似乎没有在任何地方使用。我自己对这种方法的目的感到困惑。

          【讨论】:

            【解决方案7】:

            只是为了澄清我之前的解释,理论上这个想法是 聚合应该像这样工作,(我已经更改了参数的名称以使其更清晰):

            Seq(1,2,3,4).aggragate(0)(
                 addToPrev = (prev,curr) => prev + curr, 
                 combineSums = (sumA,sumB) => sumA + sumB)
            

            应该在逻辑上翻译成

            Seq(1,2,3,4)
                .grouped(2) // split into groups of 2 members each
                .map(prevAndCurrList => prevAndCurrList(0) + prevAndCurrList(1))
                .foldLeft(0)(sumA,sumB => sumA + sumB)
            

            由于聚合和映射是分开的,所以理论上可以将原始列表分成不同大小的不同组,并行运行,甚至在不同的机器上运行。 实际上,scala 当前的实现默认不支持此功能,但您可以在自己的代码中执行此操作。

            【讨论】:

              猜你喜欢
              • 2013-09-25
              • 2021-04-07
              • 1970-01-01
              • 2021-09-27
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 2011-06-12
              • 1970-01-01
              相关资源
              最近更新 更多