1. 提出问题

  相信很多面试也会问道类似的问题,那么这篇文章就是要围绕这个问题来进行展开进行分析Spark的资源动态分配。
  通常我们在通过spark-submit提交Spark应用到yarn集群的时候,都会配置num-executors来指定分配多少个Executor,但是对于经验不足的新手而言,经常会不知道如何分配合适的资源数,所以经常会选择尽量多分配资源,认为资源分配的越多任务运行的就会越快,当然在一定范围内确实是资源分配的越多任务就会运行的越快,但是也不是说这个多是无上限的无脑堆砌资源,实际上经常出现的情况就是分配资源过多,导致资源利用率不高资源浪费,更坏的情况是由于自己占据资源过多,导致集群中其它任务能够使用的资源就少了,从而影响到了其它任务的运行效率。
  为了解决这种问题,我们可以选择开启Spark的资源动态分配。在解释动态资源分配原理之前,我们可以先自己思考一下,如果你作为一个设计者,要设计动态资源分配的方案,应该考虑的大的点有哪一些。我个人认为主要有以下内容:

  • executor动态分配以为着executor的数量会依据任务运行状况动态调整,那么首先动态调整应该如何调整?例如要参考哪些参数来对数量进行调整,而且调整范围如何限定(上界和下界的确定)。
  • executor动态调整中,启动和关闭executor的时候也是需要时间的,这个过程中任务的状态也是在瞬息万变的,那么该如何均衡这中间的平衡性。即executor的新增和删除要考虑任务情况,不能过快或者过慢,这个速率该如何控制。
  • 新增或者是删除Executor这一行为所触发的具体条件是什么。
  • Executor不仅只是对外提供计算资源服务,熟悉Spark内存模型的人还会知道Executor还会提供RDD等数据的持久化,如果删除了Executor,那么这个Executor上面可能存在的缓存数据怎么处理才能保证可以被其它需要的Executor访问。
    还有一些细节暂时不做过多考虑,这里主要先思考这四个问题,围绕这四个问题来进行解答。

2. 设计分析

  以上的分析可以看出一点的是,不论是如何新增还是删除Executor,这中间可以引申出一个Executor生命周期的概念。首先,我们以不开启动态资源分配情况下的静态资源分配来分析一下Executor在任务运行期间的状态变化,采用的是spark-shell模式下提交Spark应用,执行命令如下:

# yarn模式下运行
$ spark-shell --master=yarn --num-executors=1 executor-cores=1

# 提交任务1的wordcount任务
$ sc.textFile("xxxx").flatMap(line=>line.split(" ")).map(w=>(w,1)).reduceByKey(_+_).count()

# 提交任务2的wordcount任务
$ sc.textFile("xxxx").flatMap(line=>line.split(" ")).map(w=>(w,1)).reduceByKey(_+_).count()

  以上过程中,任务1和任务2中间会休息一段时间,那么这个过程中,Executor的运行状态可以如下图所示:
聊一聊Spark资源动态分配

  可以发现,这两次任务之间隔得时间如果较长,那么中间集群资源是存在浪费情况的。那么针对这种情况,我们来开启动态资源分配,执行任务完全一样,再来分析过程,如下:
聊一聊Spark资源动态分配
  接下来,我们针对动态资源分配的图,对各个步骤进行介绍:

    1. spark-shell start: 启动spark-shell应用
    1. Executor1 Start:启动Executor1
    1. Job1 Start:任务1提交成功,此时Executor进入Busy状态。
    1. Job1 End: 任务1运行结束,此时Executor1进入空闲Idle状态。
    1. Executor1 Timeout:Executor1空闲一段时间后,超时被kill掉。
    1. Job2 Submit:提交第二次任务,此时发现没有可运行的Executor,Job2暂时处于Pending状态等待资源。
    1. Executor2 Start:过了一段时间限制(这个时间可设置),发现Job2还是没有获取到可执行任务的Executor,那么启动一个Executor2来提供给Job2执行。
    1. Job2 Start:此时,Executor2启动成功,Job2发现Executor2可以使用,那么去Executor2上执行任务。
    1. Job2 End:Job2执行完毕,Executor2进入Idle状态。
    1. 后续如果spark-shell没有关闭,那么如果没有任务,又会出现Executor2被kill掉。

  以上流程,需要关注的几点有:

  • Executor超时:当Executor不执行任何任务时,会被标记为Idle状态。空闲一段时间后即被认为超时,会被kill。该空闲时间由spark.dynamicAllocation.executorIdleTimeout决定,默认值60s。对应上图中:Job1 End到Executor1 timeout之间的时间。
  • 资源不足时,何时新增Executor:当有Task处于pending状态,意味着资源不足,但是此时不会立即新增Executor,会等待spark.dynamicAllocation.schedulerBacklogTimeout配置的时间(单位秒,默认1s),超过这个时间任务还是处于Pending状态,此时需要增加Executor。
  • 该新增多少Executor:新增Executor的个数主要依据是当前负载情况,即running和pending任务数以及当前Executor个数决定。用maxNumExecutorsNeeded代表当前实际需要的最大Executor个数,maxNumExecutorsNeeded和当前Executor个数的差值即是潜在的新增Executor的个数。注意:之所以说潜在的个数,是因为最终新增的Executor个数还有别的因素需要考虑,后面会有分析。下面是maxNumExecutorsNeeded计算方法:
private def maxNumExecutorsNeeded(): Int = {
    val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks
    math.ceil(numRunningOrPendingTasks * executorAllocationRatio /
              tasksPerExecutorForFullParallelism)
      .toInt
  }
  1. 其中numRunningOrPendingTasks为running和pending状态的任务数之和。
  2. executorAllocationRatio意思很显然就是Executor分配的比率。单纯论效率而言,最快的情况下就是为每个pending状态等待的任务新增一个Executor,这样就是并行度最大化。但是,实际情况下这样往往会导致资源浪费,因为很可能某个任务申请的Executor还没启动,会出现一些其它任务执行完毕处于空闲状态的Executor出现,这种情况下其实就完全没必要去新增Executor浪费资源了。这个值取值范围是[0,1],默认是1。
  3. tasksPerExecutorForFullParallelism:每个Executor的最大并发数,简单理解为:cpu核心数(spark.executor.cores)/ 每个任务占用的核心数(spark.task.cpus)。

  继续回到之前设计过程提到的一些需要解决的问题,由于集群状态时刻是在变化的,如何调整Executor的数量?新增和删除Executor的上下界分别如何确定?
  对于这个问题,Spark提供了以下参数来控制:

  • spark.dynamicAllocation.minExecutors:调整Executor时的下界,即最少要有多少个Executor,默认为0。
  • spark.dynamicAllocation.maxExecutors:调整Executor时的上界,即最多要有多少个Executor,默认为Integer.MAX_VALUE
  • spark.dynamicAllocation.initialExecutors:Executor初始数量,这个一般是在启动命令参数中配置的,否则默认为minExecutors
      这三个参数就确定了初始Executor和上下界的问题。

注意:当spark.dynamicAllocation.schedulerBacklogTimeout秒内一直有pending状态的任务时,会触发新增Executor的请求,过了这段时间后,再过spark.dynamicAllocation.sustainedSchedulerBacklogTimeout秒如果还是存在pending状态的任务,那么会再次触发请求新增Executor。并且,这个轮询过程中,每次请求的Executor数量是和上次承指数增长的。即第一次请求1个Executor,之后会分别请求2、4、8个依次增加。

另外一个十分重要的问题,对于要被kill的Executor上持久化的数据该如何处理?
  如果Executor中缓存有数据,那么该Executor的空闲时间(上图中的Executor的timeout过期时间)就不是由spark.dynamicAllocation.executorIdleTimeout来决定的了,而是由spark.dynamicAllocation.cachedExecutorIdleTimeout来决定的,这个参数默认值是无限大的,意思就是说如果我们设置了动态资源调整,并且存在缓存的RDD等数据,如果不设置cachedExecutorIdleTimeout这个参数,那么可能就会出现Executor因为包含有缓存数据而无法被kill掉一直存活。

这里先需要注意一点的是,对于持久化存储在Executor内存中的数据而言,是多种多样的,主要可以分为两大类,一类是在本地磁盘有备份,另一类是在本地磁盘没有备份。那么对于在本地磁盘有备份的数据而言,主要指的是Shuffle map端操作产生的RDD,这类RDD会将数据溢写到本地磁盘,然后作为数据源提供给Shuffle reduce端来进行数据的读取;而对于另外一类的数据典型的就是我们在代码规范中说的一点对于后续会有多个不同分支执行不同业务计算的RDD,最好提取cache从而减少后续不同分支的重复计算,对于这种情况下很可能就是多个transform操作链路中间出现分支,所以对中间链路的RDD执行了手动cache,这种情况下缓存的RDD计算结果在内存中是不会溢写一份到磁盘中的,所以这类数据如果丢失了是需要依据DAG重新计算的。

  了解以上Executor中数据不同类型后,针对Executor被kill的情况,spark官网也考虑到了不同类型数据造成的影响不一样,所以默认的对于有缓存的数据,spark.dynamicAllocation.cachedExecutorIdleTimeout默认设置无限大从而避免有缓存的Executor被kill而导致需要重新计算影响性能。但是这样其实对于资源的动态调整是有一定负面影响的,但是官方暂时未提供后续优化解决办法,只是文档中提出后期版本可能会选择把缓存的数据放到堆外内存中,这样就可以脱离对Executor的依赖,从而Executor即使被kill了也无所谓。
  以上就是关于动态资源分配的介绍,本文由于个人较懒,很多内容均copy来自参考博客,并查阅了相关官方文档。
参考博客:

https://www.jianshu.com/p/ec7966919dbe
https://www.jianshu.com/p/5749ad5d48e6
https://www.jianshu.com/p/3e884647532c
http://spark.apache.org/docs/latest/job-scheduling.html#graceful-decommission-of-executors

相关文章:

  • 2021-07-29
  • 2022-12-23
  • 2021-09-28
  • 2021-11-20
  • 2022-12-23
  • 2021-06-22
  • 2021-09-23
  • 2022-12-23
猜你喜欢
  • 2021-06-01
  • 2021-11-20
  • 2021-10-13
  • 2021-12-08
  • 2022-12-23
  • 2021-05-31
相关资源
相似解决方案