--绿字猜测--红字疑问

从Map输出到Reduce输入的整个过程可以广义地称为Shuffle

 

Mapreduce引擎

map输出键值对数据不断写入环形缓存区(内存),到达阀值后spill溢写(这个过程中是按照partition和key值升序排序写入,如果有调用combiner则在排序后再调用combiner),一次溢写会在磁盘上产生一个小文件,全部溢写完成得到多个小文件,对这些小文件进行merge,得到一个合并文件(即一个map任务只产生一个中间数据文件)(数据是全局排序的,排序算法见笔记Spark中sortshufflewriter排序详解)。以上map端的shuffle结束。reduce需要访问map的结果数据取得想要的数据(自己的reduce对应的分区数据),如果某一个map数据不大,可以直接放在内存中,等存储的map数据内存放不下的时候会把这个内存中的数据merge成一个文件写入磁盘(如果一个map的结果就超过了内存那就直接写磁盘)

MapReduce和Spark相关原理_Shuffle

MapReduce和Spark相关原理_Shuffle

 

Spark引擎

主要分成两个,Hash shuffle和Sort shuffle

Hash shuffle

已经基本淘汰,主要是由于产生的小文件太多,一个map的结果数据可能会拆成r个buffer

文件数量m*r,m为map数,r为reduce数

MapReduce和Spark相关原理_Shuffle

Sort shuffle

分为普通机制和bypass机制

普通机制:map数据先将数据写入一个内存数据结构中(默认5m),写入大小到达一个阀值后会将这部分数据分批(默认batch=10000)写入磁盘生成一个小文件(在往磁盘写的时候会做排序)最后会做一个merge,只产生一个文件,此处会做全局排序吗,另文件数量是依据map结果的文件数量还是说是reduce的task数量

MapReduce和Spark相关原理_Shuffle

 

bypass机制:相比上面的普通机制少了排序的动作

触发条件:

1)shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。

2)不是聚合类的shuffle算子(比如reduceByKey)。

MapReduce和Spark相关原理_Shuffle

 

 

Spark中SortShuffleWirter排序方法详解

开始前先考虑一个问题,假如有100亿条数据,但是内存只有1M,但是磁盘很大,要对100亿条数据进行排序,是不可能将所有的数据加载到内存中进行排序,这就涉及到外部排序的问题,假设1M内存只能装1亿条数据,每次对1亿条数据排序,生成100个排序好的文件,最后怎么将这100个文件进行Merge生成一个全局有序的大文件。我们可以取每个文件的头数据读入内存作为buffer,并把100个buffer放在一个堆中进行排序,比较的方式是每个buffer的head元素(第一条数据)进行比较,然后不断的把每隔堆顶的buffer的head元素pop出来输出到最终的结果文件中,然后继续排序,继续输出,如果那个buffer空了就去对应的文件中补充数据,直到所有的数据排好序。SortShuffleWirter的原理大概就是这个样子。

 

 

相关文章: