目录
一、MapReduce的核心思想
二、MapReduce的整个工作流程
三、Shuffle的过程
3.1 为什么要进行Shuffle
3.2什么时候需要Shuffle
3.2 Shuffle的详细过程
四、Shuffle过程中延伸出的重点问题
1、环形缓冲区的大小/shuffle过程的瓶颈,你会怎么解决
2、分区相关
2.1为什么要进行分区?
2.2分区的原理
3、排序相关
3.1为什么在溢写过程中使用快排而不是堆排
3.2shuffle过程中的四次排序
4、注意Combiner组件的使用
5、Reduce抓取的数据放到了哪里
五、MR过程中的重点问题
1、切片相关
1.1切片原理
(整个切片过程在getSplit()方法中完成)
1.2切片大小设置
2、为什么在Map和Reduce阶段系统默认必须要对Key进行排序?
3、常用的切片机制
4、如果分区数不是1,但是ReduceTask为1,是否执行分区过程
一、MapReduce的核心思想
(1)分布式的运算程序往往需要分成至少 2 个阶段。
(2)第一个阶段的 MapTask 并发实例,完全并行运行,互不相干。
(3)第二个阶段的 ReduceTask 并发实例互不相干,但是他们的数据依赖于上一个阶段的所有 MapTask 并发实例的输出。
(4)MapReduce 编程模型只能包含一个 Map 阶段和一个 Reduce 阶段,如果用户的业务逻辑非常复杂,那就只能多个 MapReduce 程序,串行运行。
二、MapReduce的整个工作流程
1、MapReduce框架使用InputFormat模块做map前的数据预处理,该模块包括两个方法:RecorderReader进行读取,还有一个方法用来判断是否可以切割。InputSplit是MapReduce对文件进行处理和运算的基本单位,只是逻辑切分,而非物理切分,它只包含一些元数据信息,比如数据起始位置,数据长度,数据所在节点等。
2、RecordReader(RR)根据InputSplit中的信息来处理InputSplit中的具体记录,加载数据并将其转换为适合Map任务读取的键值对,输入给Map任务。
3、客户端再提交之前会向yarn提交申请运行MRappmaster的资源,之后yarn会分配资源启动mrappmaster,mrappmaster会根据客户端的资源提交路径(hdfs路径),包括切片等信息,来计算MapTask的数量,之后向yarn申请资源开始执行maptask;
4、运行map任务,Map任务会根据用户自定义的映射规则,输出一系列的
5、为了让Reduce可以并行处理Map的结果,需要对Map的输出进行一定的分区(Partition),排序(Sort),合并(Combine),归并(Merge)等操作。得到
6、Reduce以一系列
三、Shuffle的过程 3.1 为什么要进行Shuffle
为了让reduce可以并行处理Map的结果。因为对于reduce来说,处理函数的输入是key相同的所有value,但是这些value的所在的数据集(也就是map的输出)在不同的节点上,因此需要对map的输出进行重新组织,使得同样的key进入相同的reducer,也就是说需要把map产生的无序的
3.2什么时候需要Shuffle
shuffle移动了大量的数据,对计算、内存、网络和磁盘都有巨大的消耗,因此,只有确实需要shuffle的地方才应该进行shuffle,否则尽可能避免shuffle。这也是为什么有些任务能只在map端完成就在map端完成,因为此时不需要Shuffle,效率会高。
什么时候需要Shuffle?
1、去重操作:distinct等;2、聚合,byKey类操作:group By Key、sort By Key等;3、排序操作:sort By Key等;4、join操作:两个表进行join,就必须将相同join key的数据,shuffle到同一个节点上,然后进行相同key的两个表数据的笛卡尔乘积。
3.2 Shuffle的详细过程
当运行map任务输出一系列
1、当环形缓冲区达到80%时,开始溢写到本地磁盘;--->默认缓冲区大小是多少,可以不可以增大
2、在溢写的过程中,会调用partitioner进行分区,并针对每个分区内部按key进行排序(快速排序);---->为什么分区、分区的原理、为什么是快速排序
3、可以选择Combiner进行合并,提前进行分区间聚合;---->所有任务都适合Combiner合并吗
4、当某一个maptask溢写完成之后,这个maptask产生的多个溢写文件会合并成大的溢写文件放在本地磁盘,在合并过程中,会对同一分区的数据根据key进行归并排序,形成分区间有序;
注意这里:
5、ReduceTask会根据自己的分区号,从各个maptask机器上提取自己分区的数据;--->这些数据抓取过来放到哪了 内存or磁盘?
6、reducetask会抓取到同一分区的来自不同maptask的结果文件,reducetask会将这些文件再进行合并(归并排序)形成大文件,shuffle结束;后面进入ReduceTask 的逻辑运算过
程(从文件中取出一个一个的键值对Group,调用用户自定义的reduce()方法)
四、Shuffle过程中延伸出的重点问题 1、环形缓冲区的大小/shuffle过程的瓶颈,你会怎么解决
大小默认是100M,放在内存中的,可以看做是数组实现的。Shuffle 中的缓冲区大小会影响到MapReduce 程序的执行效率,原则上说,缓冲区越大,磁盘io 的次数越少,执行速度就越快。
溢写比例默认为80%。如果比例过小,会进行频繁的溢写和排序,影响效率;如果比例过大,会导致溢写不及时,环形缓冲区满了,数据无法写入。
2、分区相关 2.1为什么要进行分区?
相关参考:https://www.cnblogs.com/hadoop-dev/p/5903341.html
因为在进行MapReduce计算时,有时候需要把最终的输出数据分到不同的文件中,比如按照省份划分的话,需要把同一省份的数据放到一个文件中;按照性别划分的话,需要把同一性别的数据放到一个文件中。我们知道最终的输出数据是来自于Reducer任务。那么,如果要得到多个文件,意味着有同样数量的Reducer任务在运行。Reducer任务的数据来自于Mapper任务,也就说Mapper任务要划分数据,对于不同的数据分配给不同的Reducer任务运行。Mapper任务划分数据的过程就称作Partition。负责实现划分数据的类称作Partitioner。
2.2分区的原理
分区类:HashPartitioner
HashPartitioner是处理Mapper任务输出的,getPartition()方法有三个形参,key、value分别指的是Mapper任务的输出,numReduceTasks指的是设置的Reducer任务数量,默认值是1。那么任何整数与1相除的余数肯定是0。也就是说getPartition(…)方法的返回值总是0。也就是Mapper任务的输出总是送给一个Reducer任务,最终只能输出到一个文件中。
据此分析,如果想要最终输出到多个文件中,在Mapper任务中对数据应该划分到多个区中。那么,我们只需要按照一定的规则让getPartition(…)方法的返回值是0,1,2,3…即可。
假设我们按照性别分区,那么可以覆盖Partitioner类的getpartition(…)方法:
这样,我们通过自定义的分区类,重写了getPartition()方法,再通过以下两个设置,就完成了自定义分区。
在Job驱动中,设置自定义Partitioner:job.setPartitionerClass(CustomPartitioner.class);
自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask:job.setNumReduceTasks(5);
综上可知:
1)(1)如果ReduceTask的数量> getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;(2)如果1
(1)job.setNumReduceTasks(1);会正常运行,只不过会产生一个输出文件
(2)job.setNumReduceTasks(2);会报错
(3)job.setNumReduceTasks(6);大于5,程序会正常运行,会产生空文件
(4)分区号必须从零开始,逐一累加。
2)分区原理:key.hashcode() % numReduceTask
3)一个maptask会溢写出多个文件(归并),而一个任务又通过切片机制分成了多个maptask任务。
3、排序相关
参考:Hadoop和Spark为什么要对key进行排序_Hoult丶吴邪-CSDN博客https://blog.csdn.net/hu_lichao/article/details/109966711
3.1为什么在溢写过程中使用快排而不是堆排
因为环形缓冲区是在内存中的,快排时间复杂度:O(nlongn),空间最优:O(logn),最差为O(n),为了提高效率节省空间。
3.2shuffle过程中的四次排序 对于 MapTask ,它会将处理的结果暂时放到环形缓冲区中, 当环形缓冲区使 用率达到一定阈值后,在溢写之前,对缓冲区中的数据进行一次快速排序,之后将这些有序数 据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。 对于 ReduceTask ,它从每个 MapTask 上远程拷贝相应的数据文件,先溢写到内存,如果文件大 小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到 一定阈值,则进行一次 归并排序 以生成一个更大文件;如果内存中文件大小或者 数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完 毕后ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。 4、注意Combiner组件的使用
(1)Combiner是MR程序中Mapper和Reducer之外的一种组件。
(2)Combiner组件的父类就是Reducer。
(3)Combiner和Reducer的区别在于运行的位置
Combiner是在每一个MapTask所在的节点运行;Reducer是接收全局所有Mapper的输出结果;
(4)Combiner的意义就是对在写入文件之前,对每个分区中的数据进行一次聚集操作,以减小网络传输量。
Combiner的输出是Reducer的输⼊,Combiner绝不不能改变最终的计算结果。Combiner只应该⽤用于那种Reduce的输入key/value与输出key/value类型完全一致,且不不影响最终结果的场景。比如累加,最大值等,而求平均值就不能使用,结果错误。
5、Reduce抓取的数据放到了哪里
对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,先溢写到内存,如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。
五、MR过程中的重点问题 1、切片相关 1.1切片原理 (整个切片过程在getSplit()方法中完成)
1.1切片原理 (整个切片过程在getSplit()方法中完成)
默认情况下,切片大小等于块大小(128M),对每一个文件进行切片,而不是整体的数据。并且每次切片时判断剩余文件部分是否大于切片大小的1.1倍,若小于切片大小,则剩余部分形成一个切片,大于则继续按切片大小切片,再继续切分。切片完成后,切片信息提交到yarn(mrAppMaster),mrAppMaster会启动相对应数量的MapTask,也就是有多少个切片就有多少个MapTask。
问题:1、为什么设置切片大小等于块大小?因为如果不设置为和块大小一样大,有的MapTask会跨服务器通讯,效率不高。2、所有文件都支持切片吗?并不是所有的文件都支持切片。有一些压缩文件,使用的压缩算法不支持切片,最终就只能是一片进行处理。
1.2切片大小设置
公式:Math.max(minSize, Math.min(maxSize, blockSize));
maxsize(切片最大值):参数如果调得比blockSize小,则会让切片变小,增加maptask数量;
minsize(切片最小值):参数调的比blockSize大,则可以让切片变得比blockSize还大,减少maptask数量;
2、为什么在Map和Reduce阶段系统默认必须要对Key进行排序?
因为在Reduce阶段是将相同Key的文件进行合并,如果不对key进行排序,需要一个一个判断是否是相同的key,加大reduce端排序的压力。效率不高;而按key排好序之后,只需要与后面的相比看是否是相同的key,如果不一样,直接将前面相同key的一组拿走进入到reduce,效率高。总之,排序后的数据对于后续的使用方便很多。
3、常用的切片机制
TextInputFormat(系统默认):按行读取每条记录。key:该行在整个文件中的起始字节偏移量,LongWritable类型。value:这行的内容,Text类型。切片机制:对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下。
CombineTextInputFormat:用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以尽可能的交给一个MapTask处理。