【问题标题】:Explain the aggregate functionality in Spark (with Python and Scala)解释 Spark 中的聚合功能(使用 Python 和 Scala)
【发布时间】:2015-03-30 05:06:48
【问题描述】:

我正在寻找对通过 python 中的 spark 可用的聚合功能的更好解释。

我的例子如下(使用Spark 1.2.0版本的pyspark)

sc.parallelize([1,2,3,4]).aggregate(
  (0, 0),
  (lambda acc, value: (acc[0] + value, acc[1] + 1)),
  (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))

输出:

(10, 4)

我得到了(10,4) 的预期结果,它是1+2+3+4 和4 个元素的总和。如果我将传递给聚合函数的初始值从(0,0) 更改为(1,0),我会得到以下结果

sc.parallelize([1,2,3,4]).aggregate(
    (1, 0),
    (lambda acc, value: (acc[0] + value, acc[1] + 1)),
    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))

输出:

(19, 4)

值增加 9。如果我将其更改为 (2,0),则值将变为 (28,4),依此类推。

有人可以向我解释这个值是如何计算的吗?我预计值会增加 1 而不是 9,预计会看到 (11,4) 而我看到的是 (19,4)

【问题讨论】:

    标签: python scala apache-spark aggregate rdd


    【解决方案1】:

    我并没有完全相信 接受的答案,而 JohnKnight 的回答有所帮助,所以这是我的观点:

    首先,用我自己的话解释一下aggregate()

    原型

    聚合(zeroValue,seqOp,combOp)

    说明

    aggregate() 允许您获取一个 RDD 并生成一个与原始 RDD 中存储的类型不同的单个值。

    参数

    1. zeroValue: 初始化值,你的结果,在你想要的 格式。
    2. seqOp:要对 RDD 记录应用的操作。运行一次 分区中的每条记录。
    3. combOp:定义结果对象如何(每个分区一个), 合并。

    示例

    计算一个列表的总和和该列表的长度。以一对(sum, length) 的形式返回结果。

    在 Spark shell 中,我首先创建了一个包含 4 个元素和 2 个分区的列表:

    listRDD = sc.parallelize([1,2,3,4], 2)
    

    然后我定义了我的 seqOp

    seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) )
    

    还有我的combOp

    combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]) )
    

    然后我汇总:

    listRDD.aggregate( (0, 0), seqOp, combOp)
    Out[8]: (10, 4)
    

    如您所见,我给变量取了描述性名称,但让我进一步解释一下:

    第一个分区有子列表 [1, 2]。我们将 seqOp 应用于该列表的每个元素,这将产生一个本地结果,一对(sum, length),它将在本地反映结果,仅在第一个分区中。

    所以,让我们开始吧:local_result 被初始化为我们提供给aggregate()zeroValue 参数,即(0, 0),list_element 是列表的第一个元素,即1。作为结果是这样的:

    0 + 1 = 1
    0 + 1 = 1
    

    现在,本地结果是 (1, 1),这意味着,到目前为止,对于第一个分区,在仅处理第一个元素之后,总和为 1,长度为 1。注意,local_result 得到从 (0, 0) 更新到 (1, 1)。

    1 + 2 = 3
    1 + 1 = 2
    

    现在本地结果是 (3, 2),这将是第一个分区的最终结果,因为它们不是第一个分区的子列表中的其他元素。

    对第二个分区做同样的事情,我们得到 (7, 2)。

    现在我们将 combOp 应用于每个局部结果,这样我们就可以形成最终的全局结果,如下所示:(3,2) + (7,2) = (10, 4)


    “图”中描述的示例:

                (0, 0) <-- zeroValue
    
    [1, 2]                  [3, 4]
    
    0 + 1 = 1               0 + 3 = 3
    0 + 1 = 1               0 + 1 = 1
    
    1 + 2 = 3               3 + 4 = 7
    1 + 1 = 2               1 + 1 = 2       
        |                       |
        v                       v
      (3, 2)                  (7, 2)
          \                    / 
           \                  /
            \                /
             \              /
              \            /
               \          / 
               ------------
               |  combOp  |
               ------------
                    |
                    v
                 (10, 4)
    

    受到这个伟大的example 的启发。


    所以现在如果zeroValue 不是 (0, 0) 而是 (1, 0),那么人们会期望得到 (8 + 4, 2 + 2) = (12, 4),这不会解释你的经历。即使我们改变了我的例子的分区数,我也无法再得到它。

    这里的关键是 JohnKnight 的回答,其中指出 zeroValue 不仅类似于分区数,而且可能应用的次数比您预期的要多。

    【讨论】:

    • 真的很高兴它帮助了@Neethu!
    • @ab_tech_sp 这真的应该是公认的答案。特别是因为这个 Q 中最受好评的答案是 Scala(??)!
    【解决方案2】:

    使用 Scala 进行解释

    Aggregate 让您可以随意转换和组合 RDD 的值。

    它使用两个功能:

    第一个将原始集合 [T] 的元素转换并添加到本地聚合 [U] 中,并采用以下形式:(U,T) => U。您可以将其视为折叠,因此它也该操作需要零。此操作在本地并行应用于每个分区。

    这里是问题的关键所在:这里应该使用的唯一值是归约操作的零值。 此操作在每个分区上本地执行,因此,将任何内容添加到该零值将添加到结果乘以 RDD 的分区数。

    第二个操作取前一个操作[U]的结果类型的2个值,并将其组合成一个值。此操作将减少每个分区的部分结果并产生实际总数。

    例如: 给定一个字符串的 RDD:

    val rdd:RDD[String] = ???
    

    假设您想要汇总该 RDD 中字符串的长度,您可以这样做:

    1. 第一个操作会将字符串转换为大小(int)并累积大小的值。

      val stringSizeCummulator: (Int, String) => Int = (total, string) => total + string.lenght`

    2. 为加法运算提供零(0)

      val 零 = 0

    3. 将两个整数相加的操作:

      val add: (Int, Int) => Int = _ + _

    把它们放在一起:

    rdd.aggregate(ZERO, stringSizeCummulator, add)
    

    使用 Spark 2.4 及更高版本

    rdd.aggregate(ZERO)(stringAccumulator,add)
    

    那么,为什么需要零? 当 cummulator 函数应用于分区的第一个元素时,没有运行总计。此处使用零。

    例如。我的 RDD 是:

    • 分区 1:[“跳转”、“结束”]
    • 分区 2:["the", "wall"]

    这将导致:

    P1:

    1. stringSizeCummulator(ZERO, "Jump") = 4
    2. stringSizeCummulator(4, "over") = 8

    P2:

    1. stringSizeCummulator(ZERO, "the") = 3
    2. stringSizeCummulator(3, "wall") = 7

    减少:add(P1, P2) = 15

    【讨论】:

    • 你是对的。当我通过指定不同的值开始使用 spark.default.parallelism 设置时,当我将 (1,0) 作为聚合函数的初始值传递时,每次运行返回的值都会发生变化。你的解释更有意义。谢谢。
    • 关于Python的问题,使用scala的anwser? pyspark中是否存在这种东西?
    • @pltrdy 希望这是这里唯一的问题!这个答案没有解释为什么 OP 会出现这种行为。看起来很有吸引力,我也投了赞成票,但我不认为它回答了这个问题......:/
    【解决方案3】:

    我没有足够的声望点来评论 Maasg 之前的回答。 实际上,零值对 seqop 应该是“中性的”,这意味着它不会干扰 seqop 结果,例如 0 对 add 或 1 对 *;

    你不应该尝试使用非中性值,因为它可能会被任意应用。 此行为不仅与分区数有关。

    我尝试了与问题中所述相同的实验。 对于 1 个分区,零值应用了 3 次。 有 2 个分区,6 次。 有 3 个分区,9 次,这样会继续下去。

    【讨论】:

      【解决方案4】:

      您可以使用以下代码(在 scala 中)准确查看 aggregate 正在做什么。它构建了所有添加和合并操作的树:

      sealed trait Tree[+A]
      case class Leaf[A](value: A) extends Tree[A]
      case class Branch[A](left: Tree[A], right: Tree[A]) extends Tree[A]
      
      val zero : Tree[Int] = Leaf(0)
      val rdd = sc.parallelize(1 to 4).repartition(3)
      

      然后,在 shell 中:

      scala> rdd.glom().collect()
      res5: Array[Array[Int]] = Array(Array(4), Array(1, 2), Array(3))
      

      所以,我们有这 3 个分区:[4]、[1,2] 和 [3]。

      scala> rdd.aggregate(zero)((l,r)=>Branch(l, Leaf(r)), (l,r)=>Branch(l,r))
      res11: Tree[Int] = Branch(Branch(Branch(Leaf(0),Branch(Leaf(0),Leaf(4))),Branch(Leaf(0),Leaf(3))),Branch(Branch(Leaf(0),Leaf(1)),Leaf(2)))
      

      您可以将结果表示为一棵树:

      +
      | \__________________
      +                    +
      | \________          | \
      +          +         +   2
      | \        | \       | \         
      0  +       0  3      0  1
         | \
         0  4
      

      您可以看到在驱动节点(树的左侧)上创建了第一个零元素,然后将所有分区的结果一个一个合并。您还看到,如果您像在问题中那样将 0 替换为 1,它将为每个分区上的每个结果添加 1,并且还会为驱动程序的初始值添加 1。所以,你给出的 zero 值被使用的总次数是:

      number of partitions + 1.

      所以,在你的情况下,

      aggregate(
        (X, Y),
        (lambda acc, value: (acc[0] + value, acc[1] + 1)),
        (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))
      

      将是:

      (sum(elements) + (num_partitions + 1)*X, count(elements) + (num_partitions + 1)*Y)
      

      aggregate 的实现非常简单。定义在RDD.scala, line 1107:

        def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
          // Clone the zero value since we will also be serializing it as part of tasks
          var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
          val cleanSeqOp = sc.clean(seqOp)
          val cleanCombOp = sc.clean(combOp)
          val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
          val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
          sc.runJob(this, aggregatePartition, mergeResult)
          jobResult
      }
      

      【讨论】:

        【解决方案5】:

        很好的解释,它真的帮助我理解了聚合函数的底层工作。我玩了一段时间,发现如下。

        • 如果你使用 acc 作为 (0,0) 那么它不会改变函数的输出结果。

        • 如果初始累加器发生变化,那么它将处理结果如下

        [ RDD 元素总和 + acc 初始值 * RDD 分区数 + acc初始值]

        对于这里的问题,我建议检查分区,因为根据我的理解,分区数应该是 8,因为每次我们在 RDD 的分区上处理 seq op 时,它都会从 acc 结果的初始总和开始而且当它要进行comb Op时,它会再次使用acc初始值一次。

        例如 列表 (1,2,3,4) & acc (1,0)

        通过RDD.partitions.size在scala中获取分区

        如果分区为 2 且元素数为 4 则 => [ 10 + 1 * 2 + 1 ] => (13,4)

        如果分区为 4 且元素数为 4 则 => [ 10 + 1 * 4 + 1 ] => (15,4)

        希望对您有所帮助,您可以查看here 以获得解释。谢谢。

        【讨论】:

          【解决方案6】:

          感谢 gsamaras。

          我的视图如下,

          【讨论】:

            【解决方案7】:

            对于寻找上述示例的 Scala 等效代码的人来说,就是这里。相同的逻辑,相同的输入/结果。

            scala> val listRDD = sc.parallelize(List(1,2,3,4), 2)
            listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:21
            
            scala> listRDD.collect()
            res7: Array[Int] = Array(1, 2, 3, 4)
            
            scala> listRDD.aggregate((0,0))((acc, value) => (acc._1+value,acc._2+1),(acc1,acc2) => (acc1._1+acc2._1,acc1._2+acc2._2))
            res10: (Int, Int) = (10,4)
            

            【讨论】:

              【解决方案8】:

              我尝试了很多关于这个问题的实验。最好为聚合设置分区数。 seqOp 将处理每个分区并应用初始值,此外,combOp 在组合所有分区时也会应用初始值。 所以,我提出这个问题的格式:

              final result = sum(list) + num_Of_Partitions * initial_Value + 1
              

              【讨论】:

              • 这个公式显然不能成立,当初始值为0时,结果应该是列表的总和。
              【解决方案9】:

              我将解释Spark中Aggregate操作的概念如下:

              聚合函数的定义

              **def aggregate** (initial value)(an intra-partition sequence operation)(an inter-partition combination operation)
              

              val flowers = sc.parallelize(List(11, 12, 13, 24, 25, 26, 35, 36, 37, 24, 25, 16), 4) --> 4 表示我们的 Spark 集群中可用的分区数。

              因此,rdd 被分为 4 个分区:

              11, 12, 13
              24, 25, 26
              35, 36, 37
              24, 25, 16
              

              我们将问题陈述分为两部分: 问题的第一部分是汇总每个象限中采摘的花朵总数;这就是分区内序列聚合

              11+12+13 = 36
              24+25+26 = 75
              35+36+37 = 108
              24+25 +16 = 65
              

              问题的第二部分是将这些单独的聚合跨分区求和;这就是分区间聚合。

              36 + 75 + 108 + 65 = 284
              

              存储在 RDD 中的总和可以进一步用于任何类型的转换或其他操作

              所以代码变成这样:

              val sum = flowers.aggregate(0)((acc, value) =&gt; (acc + value), (x,y) =&gt; (x+y))val sum = flowers.aggregate(0)(_+_, _+_)
              Answer: 284

              解释: (0) - 是累加器 第一个+是分区内总和,加上花园每个象限中每个采摘者采摘的花朵总数。 第二个+是分区间总和,它聚合了每个象限的总和。

              案例 1:

              假设,如果我们需要在初始值之后减少函数。如果初始值不为零会怎样?如果是 4,例如:

              该数字将添加到每个分区内聚合以及分区间聚合:

              所以第一个计算是:

              11+12+13 = 36 + 5 = 41
              24+25+26 = 75 + 5 = 80
              35+36+37 = 108 + 5 = 113
              24+25 +16 = 65 + 5 = 70
              

              这里是初始值为5的分区间聚合计算:

              partition1 + partition2 + partition3+ partition4 + 5 = 41 + 80 + 113 + 70 = 309
              

              因此,进入您的查询:总和可以根据 rdd 数据分布的分区数来计算。我认为您的数据分布如下,这就是为什么您的结果为 (19, 4)。因此,在进行聚合操作时,请指定分区值的数量:

              val list = sc.parallelize(List(1,2,3,4))
              val list2 = list.glom().collect
              val res12 = list.aggregate((1,0))(
                    (acc, value) => (acc._1 + value, acc._2 + 1),
                    (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
              )
              

              结果:

              list: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[19] at parallelize at command-472682101230301:1
              list2: Array[Array[Int]] = Array(Array(), Array(1), Array(), Array(2), Array(), Array(3), Array(), Array(4))
              res12: (Int, Int) = (19,4)
              

              解释:因为你的数据分布在8个分区,结果是这样的(使用上面解释的逻辑)

              分区内加法:

              0+1=1
              1+1=2
              0+1=1
              2+1=3
              0+1=1
              3+1=4
              0+1=1
              4+1=5
              
              total=18
              

              分区间计算:

              18+1 (1+2+1+3+1+4+1+5+1) = 19
              

              谢谢

              【讨论】:

                猜你喜欢
                • 1970-01-01
                • 1970-01-01
                • 1970-01-01
                • 2018-03-16
                • 1970-01-01
                • 1970-01-01
                • 1970-01-01
                • 1970-01-01
                • 1970-01-01
                相关资源
                最近更新 更多