【问题标题】:spark reduce function: understand how it worksspark reduce 功能:了解它的工作原理
【发布时间】:2016-07-12 08:47:03
【问题描述】:

我正在接受这个course

它表示 RDD 上的 reduce 操作是一次在一台机器上完成的。这意味着如果您的数据分布在两台计算机上,那么下面的函数将处理第一台计算机中的数据,将找到该数据的结果,然后它将从第二台计算机获取单个值,运行该函数,它将继续这样,直到它完成机器 2 中的所有值。这是正确的吗?

我以为该函数会同时在两台机器上开始运行,然后一旦有两台机器的结果,它将再次运行该函数最后一次

rdd1=rdd.reduce(lambda x,y: x+y)

更新 1--------------------------------------------

与减少功能相比,以下步骤会给出更快的答案吗?

Rdd=[3,5,4,7,4]
seqOp = (lambda x, y: x+y)
combOp = (lambda x, y: x+y)
collData.aggregate(0, seqOp, combOp)

更新 2-----------------------------------

下面的两组代码是否应该在相同的时间内执行?我检查了一下,似乎两者都需要相同的时间。

import datetime

data=range(1,1000000000)
distData = sc.parallelize(data,4)
print(datetime.datetime.now())
a=distData.reduce(lambda x,y:x+y)
print(a)
print(datetime.datetime.now())

seqOp = (lambda x, y: x+y)
combOp = (lambda x, y: x+y)
print(datetime.datetime.now())
b=distData.aggregate(0, seqOp, combOp)
print(b)
print(datetime.datetime.now())

【问题讨论】:

    标签: apache-spark reduce


    【解决方案1】:

    reduce 的行为在本机 (Scala) 和来宾语言 (Python) 之间略有不同,但简化了一点:

    • 每个分区按元素顺序处理
    • 多个分区可以由单个工作人员(多个执行程序线程)或不同工作人员同时处理
    • 部分结果被提取到应用最终归约的驱动程序(这是在 PySpark 和 Scala 中具有不同实现的部分)

    由于您使用的是 Python,让我们看一下代码:

    1. reduce creates a simple wrapper 用于用户提供的功能:

      def func(iterator):
          ...
      
    2. 这是包装is used to mapPartitions:

      vals = self.mapPartitions(func).collect()
      

      很明显这段代码是并行的,并不关心结果是如何被利用的

    3. 使用标准 Python reduce 在驱动程序上按顺序减少收集的 vals

      reduce(f, vals)
      

      其中f 是传递给RDD.reduce 的函数

    相比之下,Scala 会异步合并来自工作线程的部分结果。

    treeReduce 的情况下,步骤 3. 也可以以分布式方式执行。见Understanding treeReduce() in Spark

    总结reduce,不包括驱动程序端处理,使用与mapfilter等基本转换完全相同的机制(mapPartitions),并提供相同级别的并行性(再次不包括驱动程序代码)。如果您有大量分区或f 很昂贵,您可以使用tree* 系列方法进行并行/分发最终合并。

    【讨论】:

    • 我看了你的回答。我很难理解您的意见,也很难弄清楚课程中的陈述是否正确。基于“可以由单个工作人员(多个执行程序线程)或不同工作人员同时处理多个分区”,该语句似乎是不正确的。请直接回答?请使用示例突出显示您所说的内容 - 例如 RDD 是 [1,2,3,4,5,6] 和 [1,2,3] 在一台机器上,其余元素在另一台机器上.. spark和scala如何分别处理这些?感谢您的工作
    • 我没有看过课程,所以我不能提及,但如果他们真的告诉你它是在你已经腰围 $200 的时候机器完成的。 reduce,不包括驱动部分,使用与标准 Spark 转换相同的机制,因此表现出相同的并行性。
    • 请使用示例突出显示您所说的内容 - 例如 RDD 是 [1,2,3,4,5,6] 和 [1,2,3] 在一台机器上,其余的另一台机器上的元素.. spark 和 scala 如何以不同的方式处理这些?也可以回答我更新的问题吗?
    • a) aggregatereduce 之间应该没有显着的性能差异 b) 我不能使用示例,因为通常顺序不是确定性的。你可以看到非常粗略的可视化here 但基本上操作是不同步的。 3)关于 Scala - 就像我已经说过的 - Scala 异步获取任务结果而不是通过收集。
    • @zero323 从您的回答中,我了解到reduce 也最终在驱动程序中执行,对吗?谢谢
    猜你喜欢
    • 2015-10-24
    • 2015-04-17
    • 1970-01-01
    • 1970-01-01
    • 2014-01-27
    • 1970-01-01
    • 2016-02-20
    • 2012-12-11
    • 2011-02-18
    相关资源
    最近更新 更多