1. MapReduce编程模型
-
MapReduce是采用一种分而治之的思想设计出来的分布式计算框架
-
一些复杂或计算量大的任务,单台服务器无法胜任时,可将此大任务切分成一个个小的任务,小任务分别在不同的服务器上并行的执行;最终再汇总每个小任务的结果
-
MapReduce由两个阶段组 成:Map阶段(切分成一个个小的任务)、Reduce阶段(汇总小任务的结果)。
1.1 Map阶段
-
map()函数的输入是kv键值对,输出是一系列kv键值对,输出结果写入本地磁盘。
1.2 Reduce阶段
-
reduce()函数的输入是kv键值对(即map的输出kv键值对);输出也是一系列kv键值对,最终写入HDFS
2. MapReduce编程案例
2.1 MapReduce原理图
以单词统计为例:
-
block对应一个split分片,一个split对应一个map task
-
reduce task的个数由程序中编程指定
上图有3个Map(split数量=map数量)和3个Reduce加起来就有6个独立的进程空间,当这个MapReduce作业跑起来的时候,需要占用6个进程空间,这些进程(或则说算子)中有可能在同一个节点运行,也有可能都在不同节点上上运行的。
MapReduce计算框架-map执行流程:
MapReduce的数据源是来自HDFS上面的,数据源都是以File的方式表示
通过一个inputFormat类 (这两个功能其实是在inputFormat中封装好的)
第一个功能是:数据切割(Data Splits)-数据切片
第二个功能是:记录读取器(Record Reader):读取每一个block中的一条条record记录,读到map 缓冲区buffer
shuffle过程:(包含 partition,sort,spill,combiner,copy...)
1. 内存缓冲区默认是100m,如果100M内超过了80M,就会把这80M的数据给锁住,然后将80M的数据先根据hash(key)%reduce个数 ,生成{partition,key,value}形式,然后进行排序并且写入本地磁盘,最后清空80M内存,最前面的数字表示(partition number)未来这条数据被分到reduce的哪个桶。
2.溢写磁盘前第一步已经分号区了,还需要进行sort排序,根据key来排好序,然后溢写磁盘中的数据就是有序的了.
3.1.4、Combiner(数据合并)
Combiner:相当于把部分的redeuce提前在map阶段做,具体怎么做的呢?
在map缓冲区buffer里,还没进行数据合并操作,如果说把相同的key的value提前累加起来,通过把相同的key进行合并,合并完了以后spill到磁盘上的数据要远远小于不合并前的数据,这算是一个很大的优化,提高了数据的传输效率。
Combiner不能乱用???如果用的不好反而会是你的程序或者代码得到一个错误的结果
举例:如下 求中值(就是求中间的值)
1. 3 5 2 6 9 4做个排序 2 3 4 5 6 9 中间的值4 5 然后取一个最大值:5就是中值
2. 如果用commbiner做的话就会出错,比如右边2个map,第一个3 5 2 数据中值是3
第2个map中值是6,最终结果就会是6,就是错的数据。
MapReduce计算框架-reduce执行流程:
Reduce里面也有一个内存缓冲区,这个内存缓冲区跟Map的内存缓冲区一样
真正到了Reduce里面,因为Partition是已经提前做了,那肯定会保证相同的Partition的数字记录{"2","hive","1"},肯定会统一发送到后面相同id的Reduce上面去进行进一步处理
下面这个图也很经典,再次介绍下MapReduce原理
map阶段,一个map其实就是对应着一个split分片。
1.首先inputFormat类对hdfs上的数据进行split,通常一个split大小和block大小相等,split后的分片会传给多个map,但一个map只能处理一个分片,把一个split分片读进来到map,读的时候需要用inputFormat类提供了RecordReader方法的实现,可以从Split分片中以行为单位读取数据,供Mapper处理,通常读取一行数据,调用一次Map程序,因为map是一个程序,会启动了一个进程,自己维护了一个进程空间,那把split数据读进来之后,把数据直接存到了内存(buffer in memory)上。
1.1:开始往内存写数据时是有限制的,默认占用内存大小是100M,当写到80M的时候,占整个内存的80%就不能再写了,只能往20%上去写,那把这个80%已经写好的数据开始统一的往磁盘上写 并且清空原先锁住的80%的内存数据,但是转储的过程中不是简简单单的把内存数据直接存到了磁盘上。(每80M内存数据,排序后落磁盘)
1.2:在落磁盘过程中 中间还需要涉及到了一次sort排序,因为数据不断的引入,那内存(buffer in memory)就会慢慢的产生出很多很多的小的数据(因为跑任务一个map可以分配的大小一般在500-800M,所以读进来的80M内存数据块会很多很多),然后每一个数据块都有不同的区域(partition),然后把内存的数据往磁盘上去写的时候它会做排序。
(分区排序然后spill溢写磁盘)
比如说在内存里面都是一些100元钞票的数据,50元钞票的数据,10元钞票的数据,开始读数据的时候,数据是杂乱无章,通过一次排序之后会发现100元的会排到一块,50元排到一块,10元排到一块,无论是从大到小排还是从小到大排最后这个key肯定是排到一块很整齐的队列。
假设说就只有这三张不同的面值,那这个时候(如下图8所示)
最上面第一个长方形是一个小的数据文件,里面又切成了三个部分-partition(如上图8所示)那这三个partition都代表着100元钞票和50元钞票和10元钞票的数据。
接下来又产生出了一个新的数据文件(如上图8中间那个长方形数据),新文件可能是100元占的很少,而50元占得很多,而10元占得比较少,然后后面它慢慢又产出新的文件数据。
1.3:Map中溢写的文件如何合并??
随着数据不断的引入,从Map里面会产生出很多很多的小数据文件。这时候磁盘上已经有很多这样小的文件了,那应该把这些小的文件进行合并成一个大文件,怎么合并 并且要保证数据准确的顺序性???,那从小文件进行合并是怎么得到的呢?就是用了一次归并排序。
归并排序:开始把指针都指向各自文件的文件头上,然后开始互相之间比较,就是比如三个文件,每一个文件都有一个指针,然后相当于每一个指针都相当于按照从大到小已经排序好的,那么这个时候去往这个大的数据上开始合并。比如说现在指针指向了100面值的文件头上,那么100面值的往后写 归并,那么归并排序的前提是你指的每一个数据的都是先排好序的,这就是归并排序。然后做好归并排序之后产生大的文件也是排好序的。
2:Reduce阶段:提交任务可以设置reduce的个数,只看一个reduce(如下图9所示)
图9:
比如这个reduce只处理100元面值的金额的钞票,把前面每个map阶段属于100元的区域(partion)的数据全部归纳到reduce里面的merge节点机器上,把数据硬拷贝过去reduce里面,那reduce这机器上面发现可以拷贝出多个数据(merge),那这个数据量也是很大的,因为从其他map中数据小的文件比较多,这时候就会慢慢的再去做合并(merge),通过不断的合并慢慢的就合并成大的数据,然后再统一的扔给reduce,然后reduce再进行数据计数工作最后输出,这么一个过程。
当然reduce读的过程中,它也是把数据读到reduce机器的内存中,然后在内存里面也会有相应的排序。
一个reduce可以处理不同的数据也可以处理一种面值的数据
或者是一个reduce可以处理多个面值钞票。
- Map阶段的shuffle是什么??
重新洗牌的意思,对map阶段中读取split中的数据到内存中80%溢写之前需要shuffle,
Shuffle包含partition、sort、commbiner、spill、meger、...
- Map阶段的分区partition是什么??
对读到map缓存区buffer的数据进行分区partition--用hash(key)取模reduce数量来产生{partition,key,value},每80M内存数据先partition好,然后根据key来sort排好序,然后commbiner提前合并数据较少partition数量,最后溢写
- Map阶段在溢写数据到磁盘前如何排序sort???
每溢写80M数据需要先partition--用hash(key)取模reduce数量来产生{partition,key,value},每80M内存数据先partition好,然后根据key来sort排好序,那么每一个80M形成的所有分区都是排好序的
- 为什么会有sort排序???
缓冲区的数据不经过排序直接存到磁盘上数据会很乱,排序目的是把相同的key排到一块,然后把缓冲区排序好的数据输出到磁盘上,每次输出都产生了很多小的文件数据(Spill.n)。