【问题标题】:In Apache spark, what is the difference between using mapPartitions and combine use of broadcast variable and map在Apache spark中,使用mapPartitions和结合使用广播变量和地图有什么区别
【发布时间】:2016-01-08 00:21:45
【问题描述】:

在 Spark 中,我们使用广播变量来使每台机器都拥有一个变量的只读副本。我们通常在闭包之外创建一个广播变量(例如闭包所需的查找表)以提高性能。

我们还有一个名为 mapPartitions 的 spark 转换运算符,它试图实现相同的目的(使用共享变量来提高性能)。例如,在 mapPartitions 中,我们可以为每个分区共享一个数据库连接。

那么这两者有什么区别呢?我们可以将它互换用于共享变量吗?

【问题讨论】:

    标签: java scala apache-spark


    【解决方案1】:

    broadcast 用于将对象传送到每个工作节点。该对象将在该节点上的所有分区之间共享(并且值/即对象对于集群中的每个节点都是相同的)。当您在工作节点上的许多不同任务/分区中使用相同的数据时,广播的目标是节省网络成本。

    mapPartitions 相比之下,是 RDD 上可用的方法,与map 类似,仅在分区上工作。是的,您可以定义新对象,例如 jdbc 连接,然后每个分区都是唯一的。但是,您不能在不同的分区之间共享它,更不用说在不同的节点之间共享了。

    【讨论】:

    • 我明白了,主要区别是级别对吗?广播在节点级别,但 mapPartitions 在分区级别。
    • 从某种意义上说,是的。但是,用法(特别是语法)是如此不同,以至于我不愿在这两种情况之间进行比较。通常您广播一个现有的数据数组,但在 mapPartitions 中您在该级别创建一个新对象。顺便说一句,广播是在集群级别,而不是节点级别。
    【解决方案2】:

    虽然 KrisP 提供的答案突出了所有重要差异,但我认为值得注意的是,mapPartitions 只是高级转换背后的低级构建块,而不是实现共享状态的方法。

    虽然mapPartitions 可用于明确共享喜欢的状态,但在技术上它不共享(其生命周期仅限于mapPartitionsclosure`),还有其他方法可以实现它。特别是,在闭包内引用的变量在分区内共享。为了说明这一点,让我们玩一下单例:

    object DummySharedState {
      var i = 0L
      def get(x: Any) =  {
        i += 1L
        i
      }
    }
    
    sc.parallelize(1 to 100, 1).map(DummySharedState.get).max
    // res3: Long = 100
    sc.parallelize(1 to 100, 2).map(DummySharedState.get).max
    // res4: Long = 50
    sc.parallelize(1 to 100, 50).map(DummySharedState.get).max
    // res5: Long = 2
    

    和 PySpark 中类似的东西:

    • 单例模块dummy_shared_state.py:

      i = 0
      def get(x):
          global i
          i += 1
          return i
      
    • 主脚本:

      from pyspark import SparkConf, SparkContext
      import dummy_shared_state
      
      master = "spark://..."
      conf = (SparkConf()
          .setMaster(master)
          .set("spark.python.worker.reuse", "false"))
      
      sc.addPyFile("dummy_shared_state.py")
      sc.parallelize(range(100), 1).map(dummy_shared_state.get).max()
      ## 100
      sc.parallelize(range(100), 2).map(dummy_shared_state.get).max()
      ## 50 
      

    请注意,spark.python.worker.reuse 选项设置为 false。如果您保留默认值,您实际上会看到如下内容:

    sc.parallelize(range(100), 2).map(dummy_shared_state.get).max()
    ## 50
    sc.parallelize(range(100), 2).map(dummy_shared_state.get).max()
    ## 100
    sc.parallelize(range(100), 2).map(dummy_shared_state.get).max()
    ## 150
    

    归根结底,您必须区分三种不同的事物:

    • 广播变量,旨在减少网络流量和内存占用,方法是将变量的副本保留在工作线程上,而不是随每个任务一起提供
    • 在闭包外部定义的变量和在闭包内部引用的变量必须随每个任务一起提供并为此任务共享
    • 在闭包内定义的变量不共享

    除此之外,还有一些与持久解释器的使用相关的 Python 特定问题。

    在变量生命周期方面,mapfilter 或其他转换)和mapPartitions 之间仍然没有实际区别。

    【讨论】:

    • 这真的很有帮助。我想知道 map 和 mapPartitions 之间的另一个区别。 map 在单行的基础上运行,而 mapPartitions 可以访问通过迭代器发送到该分区的所有行。是否可以根据上面的示例访问全局变量以及发送到分区的所有行?我假设 mapPartition 不能使用您在此处显示的全局范围变量。
    • @retrocookie map 是使用mapPartitions 实现的,所以这里没有区别。不过我会小心的。更重要的是展示保持持久解释器的含义。
    • 那么通过 mapPartition 访问的全局变量是否在节点级别而不是分区级别共享?
    • 比这要复杂一点。如果您询问 PySpark,那么答案是否定的。 Worker 是一个 JVM 进程,它使用套接字与 Python 执行器通信。所以那里没有全球 Python。
    • 所以使用默认选项时,每个执行程序有一个 python 解释器,可以用于多个分区,对吧?
    猜你喜欢
    • 2018-08-14
    • 1970-01-01
    • 1970-01-01
    • 2016-06-23
    • 2016-06-05
    • 2012-05-07
    • 1970-01-01
    • 2018-03-11
    • 2016-01-30
    相关资源
    最近更新 更多