1、MapReduce 概述2、MapReduce的优缺点
2.1、优点2.2、缺点 3、MapReduce 核心思想
3.1、MapReduce 计算流程3.2、MapReduce 编程规范 4、MapReduce [反]序列化5、MapReduce 框架原理
5.1、切片与 MapTask 并行度决定机制5.2、ReduceTask 并行度决定机制5.3、Job提交流程源码解析5.4、FileInputFormat 切片源码解析 6、MapReduce 切片机制
6.1、FileInputFormat6.2、TextInputFormat6.3、CombineTextInputFormat 7、MapReduce 工作流程
7.1、Shuffle 机制7.2、Partioan 分区7.3、WritableComparable 排序7.4、Combiner 归并7.5、压缩 8、MapReduce 工作机制
8.1、MapTask 工作机制8.2、ReduceTask 工作机制 9、MapReduce Join
9.1、Reduce Join9.2、Map Join 10、MapReduce 知识点
10.1、MapReduce 怎么实现 TopN?10.2、有可能使 Hadoop 任务输出到多个目录中么?如果可以,怎么做?10.3、Hadoop中RecordReader的作用是什么?
1、MapReduce 概述
MapReduce 是一个分布式运算程序的编程框架,是用户开发基于 Hadoop 的数据分析应用的核心框架。
MapReduce 核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个 Hadoop 集群上,从而处理海量数据的计算问题。
2、MapReduce的优缺点 2.1、优点易于编程
它简单的实现一些接口,程序中并行场景的考虑因素比较少,方便编程。
良好的扩展性
当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。
高容错性
MapReduce 设计的初衷就是使程序能够部署在廉价的 PC 机器上,廉价机器更容易挂机,机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由 Hadoop 内部完成的。
适合 PB 级以上海量数据的离线处理
可以实现上千台服务器集群并发工作,提供海量数据处理能力。
2.2、缺点不擅长实时计算
MapReduce 无法像 MySQL 一样,在毫秒或者秒级内返回结果。
不擅长流式计算
流式计算的输入数据是动态的,而 MapReduce 自身的设计特点决定了数据源必须是静态的,不能动态变化,无法进行流式计算。
不擅长 DAG(有向无环图)计算
多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。MapReduce 实现的方式就是多个 MapReduce程序连接在一起。这样每个 MapReduce 作业的输出结果都会写入到磁盘,会造成大量的磁盘 IO,导致性能非常的低下。
3、MapReduce 核心思想 3.1、MapReduce 计算流程一个完整的 MapReduce 程序在分布式运行时有三类实例进程:
MrAppMaster:负责整个程序的过程调度及状态协调。MapTask:负责 Map 阶段的整个数据处理流程。ReduceTask:负责 Reduce 阶段的整个数据处理流程。以下是计算流程:
分布式的运算程序往往需要分成至少 2 个阶段。
第一个阶段的 MapTask 并发实例,完全并行运行,互不相干。
第二个阶段的 ReduceTask 并发实例互不相干,但是他们的数据依赖于上一个阶段的所有 MapTask 并发实例的输出。
MapReduce 编程模型只能包含一个 Map 阶段和一个 Reduce 阶段,如果用户的业务逻辑非常复杂,那就只能多个 MapReduce 程序,串行运行。
3.2、MapReduce 编程规范Mapper阶段
用户自定义的Mapper要继承自己的父类
Mapper的输入数据是KV对的形式(KV的类型可自定义)
Mapper中的业务逻辑写在map()方法中
Mapper的输出数据是KV对的形式(KV的类型可自定义)
map()方法(MapTask进程)对每一个
Reducer阶段
用户自定义的Reducer要继承自己的父Reducer阶段Reducer的输入数据类型对应Mapper的输出数据类型,也是KVReducer的业务逻辑写在reduce()方法中ReduceTask进程对每一组相同k的Driver阶段
相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是封装了MapReduce程序相关运行参数的job对象。
4、MapReduce [反]序列化 Java 的序列化是一个重量级序列化框架(Serializable),对象被序列化后,会附带很多额外的信息(各种校验信息,Header,继承体系等),网络传输效率低。所以Hadoop 自己开发了一套序列化机(Writable)。
Writable 序列化特点:
紧凑 :高效使用存储。
快速:读写数据的额外开销小。
互操作:支持多语言的交互。
自定义bean对象序列化传输步骤及注意事项:
必须实现Writable接口
反序列化时,需要反射调用空参构造函数,所以必须有空参构造
重写序列化方法
@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(id);out.writeString(name);out.writeLong(sumFlow);}
重写反序列化方法@Overridepublic void readFields(DataInput in) throws IOException {id = in.readLong; name = in.readString;}
注意反序列化方法中属性收集顺序和序列化方法中属性收集顺序完全一致
要想把结果显示在文件中,需要重写toString(),且用t分开,方便后续用
如果需要将自定义的bean放在key中传输,则还需要实现comparable接口,因为mapreduce框中的shuffle过程一定会对key进行排序
@Overridepublic int compareTo(FlowBean o) {// 倒序排列,从大到小return this.id > o.getId() ? -1 : 1;}
5、MapReduce 框架原理MapReduce 计算流程大概分为:
向Yarn提交MapReduce Job使用切片机制对数据进行切片(获取原始数据,确定MapTask数量)Yarn MrAppMaster 调度启动 MapTask ,执行计算Yarn MrAppMaster 调度启 启动 ReduceTask, 执行计算返回结果 5.1、切片与 MapTask 并行度决定机制数据块:Block 是 HDFS 物理上把数据分成一块一块。数据块是 HDFS 存储数据单位。
数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。数据切片是 MapReduce 程序计算输入数据的单位,一个切片会对应启动一个 MapTask,MapReduce默认使用TextFileInputFormat切片策略。
数据切片决定 MapTask 并发数量,这个 数据切片的默认值为 dfs.blocksize。这是由于 Yarn MrAppMaster在分配 MapTask 的时候,会优先考虑在存放Block的节点分配 MapTask,这样就能减少网络IO,提高效率。但是如果,数据切片大小不相等数据块大小,那么即使Block本地启动的MapTask也难免需要网络IO读取其他节点的Block,效率有所损失。
设置 ReduceTask 并行度(个数)
ReduceTask 的并行度同样影响整个 Job 的执行并发度和执行效率,但与 MapTask 的并发数由切片数决定不同,ReduceTask 数量的决定是可以直接手动设置:
// 默认值是 1,手动设置为 4job.setNumReduceTasks(4);
ReduceTask=0,表示没有Reduce阶段,输出文件个数和Map个数一致。
ReduceTask默认值就是1,所以输出文件个数为一个。
如果数据分布不均匀,就有可能在Reduce阶段产生数据倾斜
ReduceTask数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有1个ReduceTask。
具体多少个ReduceTask,需要根据集群性能而定。
如果分区数不是1,但是ReduceTask为1,是否执行分区过程。答案是:不执行分区过程。因为在MapTask的源码中,执行分区的前提是先判断ReduceNum个数是否大于1。不大于1肯定不执行。
5.3、Job提交流程源码解析 MapReduce任务提交之后,会生成一个 Yarn 客户端,提交任务到Yarn集群(本地环境会提交到本地),提交流程如下:
MapReduce任务提交成功之后,就要开始执行任务了。执行任务第一步,当然是获取数据,然后启动MapTask,在这之前要先作切片处理了,下面是 FileInputFormat 切片机制的源码解析:
程序先找到你数据存储的目录。开始遍历处理(规划切片)目录下的每一个文件遍历第一个文件ss.txt 获取文件大小fs.sizeOf(ss.txt)计算切片大小computeSplitSize(Math.max(minSize,Math.min(maxSize,blocksize)))= blocksize = 128M默认情况下,切片大小=blocksize开始切,形成第1个切片:ss.txt—0:128M 第2个切片ss.txt—128:256M 第3个切片ss.txt—256M:300M(每次切片时,都要判断切完剩下的部分是否大于块的1.1倍,不大于1.1倍就划分一块切片)将切片信息写到一个切片规划文件中整个切片的核心过程在getSplit()方法中完成InputSplit只记录了切片的元数据信息,比如起始位置、长度以及所在的节点列表等 提交切片规划文件到YARN上,YARN上的MrAppMaster就可以根据切片规划文件计算开启MapTask个数。 6、MapReduce 切片机制 6.1、FileInputFormat
FileInputFormat 是 MapReduce 默认使用的切片机制,设计的核心思想是减少数据的网络IO传输,从而更高效的读取数据。有如下特点:
特点
简单地按照文件的内容长度进行切片
切片大小,默认等于Block大小
切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
源码中计算切片大小的公式
// 默认值为1mapreduce.input.fileinputformat.split.minsize=1;// 默认值Long.MAXValuemapreduce.input.fileinputformat.split.maxsize= Long.MAXValue;// 默认情况下,切片大小=blocksize。 Math.max(minSize, Math.min(maxSize, blockSize));
切片大小设置maxsize(切片最大值):参数如果比blockSize小,则会让切片变小,而且就等于配置的这个参数的值。
minsize(切片最小值):参数调的比blockSize大,则可以让切片变得比blockSize还大。
6.2、TextInputFormatTextInputFormat 其实是 FileInputFormat 默认的实现类,按行读取每条记录。键是存储该行在整个文件中的起始字节偏移量, LongWritable 类型。值是这行的内容,不包括任何行终止符(换行符和回车符),Text 类型。
6.3、CombineTextInputFormat框架默认的 TextInputFormat 切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个 MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下。
CombineTextInputFormat 用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个 MapTask 处理。
下面是 CombineTextInputFormat 切片步骤:
Map阶段:
Reduce阶段:
准备待处理文件。
主要是提交任务资源,生成切片规划,详见[MapReduce任务提交](# 5.3、Job提交流程源码解析)。
将上一步生成的 Job.xml,Job.split(切片规划) 及 mr 程序/依赖 jar包 提交给 Yarn ResourceManager。
yarn mrAppMaster 接手调度 MapReduce程序,并通过 job.split 文件请求 NodeManger 启动 Container,在Container环境中,执行 MapReduce 程序。
MapReduce 程序第一步,获取数据,一般是从 HDFS 中去获取数据,切片详见 [ MapReduce 切片机制](#6、MapReduce 切片机制)。
执行 MapReduce 的逻辑代码,将计算的结果通过 outputCollector 写入到环形缓冲区。
7 ~ 15 为 Shuffle 操作,详见 [Shuffle机制](#7.1、Shuffle 机制)
数据处理完成后,进入 Reduce 方式,执行聚合逻辑,计算完成,使用 TextOutputFormat 写出结果到HDFS。
7.1、Shuffle 机制Map 方法之后,Reduce 方法之前的数据处理过程称之为 Shuffle。我们对Shuffle的期望是:将Map的输出数据完整地传输到Reduce端。 在传输数据时,尽可能得减少不必要的带宽消耗。 降低磁盘I/O的影响。步骤如下:
Map程序执行完成之后,结果先进入 getParitition( ),getParitition( )默认使用 hashPartition( ),即(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;。如果需要自定义分区规则,就要继承 Partition 抽象类,重写 getPartition( ),再任务中提交分区则Job.setPartitionClass( ),并且控制好 ReduceTask 数量Job.setNumReduceTasks()。为数据打上分区标记后,随后进入环形缓冲区。
环形缓冲区:
同时存储数据集 以及 数据集索引的内存区域,索引由 数据的index, partition, keyStart, valueStart组成,数据是由 key, value, unsued组成。 环形缓冲区 默认 100M,当数据存储空间达到环形缓冲区的 80% 时(为了在旧数据溢写时可以同时写入新数据),环形缓冲区内的数据开始反写新数据 并 准备溢写旧数据(如果旧数据还没溢写完成,新数据已经占满内存,则新数据需要等待旧数据溢写完成,保证数据安全)。环形缓冲区的亮点在于,他可以同时兼顾 旧数据的溢写(持久化) 和 新数据的输入,并缺保证数据安全,效率相当高。
环形缓冲区旧数据准备溢写,溢写之前,将 环形缓冲区 中保存的索引默认按照分区 以及 key的字典顺序快排排序,如果想要自定义排序规则,则需要在key类中实现WritableComparable 接口重写 compareTo 方法,索引保存了数据的原地址,排序完成后修改索引保存的数据地址,就可以快速按序写入数据。
Combiner 是一个 独立与 Mapper 和 Reducer 之外,在 MapTask所在节点运行的以 Reducer 为父类的组件。由于 MapTask 数量远大于 ReduceTask 数量,所以 ReduceTask 成为了整个运行过程的性能短板,为了提高 ReduceTask 工作效率, MapReduce 提供 Combiner 组件,在 Map阶段结束,Shffle 过程中对 Map结果 先进行一次 归并计算,这样传输到 ReduceTask 的数据量大大减少,减少网络传输成本,并且由于计算数据量大大减少,那么 ReduceTask 计算时间也大大减少,实现性能提升。
但是,使用 Combiner 是有前提条件的,那就是不能影响最终的业务逻辑,而且,Combiner的输出kv应该跟Reducer的输入kv类型要对应。
自定义 Combiner 实现步骤:
自定义一个 Combiner 继承 Reducer,重写 Reduce 方法在驱动类中添加设置:job.setCombinerClass(WordCountCombiner.class);
按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件 output/spillN.out(N 表示当前溢写次数)中,将分区数据的元信息写到内存索引数据结构 SpillRecord 中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过 1MB,则将内存索引写到文件 output/spillN.out.index 中
溢写文件长期不处理,会比较杂乱,定期对溢写文件做 Merge 处理,同样是按照分区,排序的标准,对局部有序的数据排序,使用归并排序,提高效率。
由于上一步我们给多个溢写文件做了归并排序,合并后的大文件还可以进行一次 Combine,合并成一个更小的,计算后的文件。对 ReduceTask 即减少了 网络IO 时间,又减少了原始数据量,效率提高。
MapTask任务完成后,启动相应数量的ReduceTask,并告知ReduceTask处理数据范围(数据分区)。
ReduceTask 主动去 MapTask 中拉取Merge文件(要什么分区拉什么分区),如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。ReduceTask 启动了两个后台线程,一个是负责监控内存中文件大小,超过一定阈值,则进行一次合并后将数据溢写到磁盘上。另一个负责监控磁盘上文件数目,达到一定阈值,则进行一次归并排序以生成一个更大文件;当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。
分区排序完成之后,还可以对数据进行分组。
7.2、Partioan 分区数据分区可以帮助我们,根据业务场景来划分计算范围,完成符合需求的范围数据聚合统计等问题。而 MapReduce 的默认分区规则,使用了 hashPartition,源码如下:
public class HashPartitioner
要想按照自己的方式去分区,就要继承 Partition 抽象类,重写 getPartition( ),再任务中提交分区规则Job.setPartitionClass( ),并且控制好 ReduceTask 数量Job.setNumReduceTasks()。
总结如下:
如果ReduceTask的数量> getPartition的结果数,则会多产生几个空的输出文件part-r-000xx。如果1对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。
bean 对象做为 key 传输,需要实现 WritableComparable 接口重写 compareTo 方法,就可以实现排序。设置Map端输出的keyjob.setMapOutputKeyClass(FlowBean.class);,Map端输出的value job.setMapOutputValueClass(Text.class);
排序类型如下:
部分排序:MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。全排序:最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。辅助排序:GroupingComparator分组,在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序。在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。 7.4、Combiner 归并Combiner 是一个 独立与 Mapper 和 Reducer 之外,在 MapTask所在节点运行的以 Reducer 为父类的组件。由于 MapTask 数量远大于 ReduceTask 数量,所以 ReduceTask 成为了整个运行过程的性能短板,为了提高 ReduceTask 工作效率, MapReduce 提供 Combiner 组件,在 Map阶段结束,Shffle 过程中对 Map结果 先进行一次 归并计算,这样传输到 ReduceTask 的数据量大大减少,减少网络传输成本,并且由于计算数据量大大减少,那么 ReduceTask 计算时间也大大减少,实现性能提升。
但是,使用 Combiner 是有前提条件的,那就是不能影响最终的业务逻辑,而且,Combiner的输出kv应该跟Reducer的输入kv类型要对应。
以下是 自定义 Combiner 实现步骤:
自定义一个 Combiner 继承 Reducer,重写 Reduce 方法在驱动类中添加设置:job.setCombinerClass(WordCountCombiner.class);Combiner和reducer的区别在于运行的位置。
Combiner是在每一个maptask所在的节点运行;
Reducer是接收全局所有Mapper的输出结果。
7.5、压缩压缩就是用一部分的CPU开销(解压缩),来减少减少磁盘 IO、减少磁盘存储空间。所以在运算密集型的 Job,少用压缩,在IO 密集型的 Job,多用压缩。
压缩算法对比:
压缩位置以及压缩算法选择:
Read 阶段:MapTask 通过 InputFormat 获得的 RecordReader,从输入 InputSplit 中解析出一个个 key/value。
Map 阶段:该节点主要是将解析出的 key/value 交给用户编写 map()函数处理,并产生一系列新的 key/value。
Collect 收集阶段:在用户编写 map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的 key/value 分区(调用Partitioner),并写入一个环形内存缓冲区中。
Spill 阶段:即“溢写”,当环形缓冲区满后,MapReduce 会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。
溢写阶段详情:
利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号Partition 进行排序,然后按照 key 进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照 key 有序。按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件 output/spillN.out(N 表示当前溢写次数)中。如果用户设置了 Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。将分区数据的元信息写到内存索引数据结构 SpillRecord 中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过 1MB,则将内存索引写到文件 output/spillN.out.index 中。Merge 阶段:当所有数据处理完成后,MapTask 对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。当所有数据处理完后,MapTask 会将所有临时文件合并成一个大文件,并保存到文件output/file.out 中,同时生成相应的索引文件 output/file.out.index。 在进行文件合并过程中,MapTask 以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并 mapreduce.task.io.sort.factor(默认 10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。让每个 MapTask 最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。
8.2、ReduceTask 工作机制Copy 阶段:ReduceTask 从各个 MapTask 上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
Sort 阶段:在远程拷贝数据的同时,ReduceTask 启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。按照 MapReduce 语义,用户编写 reduce()函数输入数据是按 key 进行聚集的一组数据。为了将 key 相同的数据聚在一起,Hadoop 采用了基于排序的策略。由于各个 MapTask 已经实现对自己的处理结果进行了局部排序,因此,ReduceTask 只需对所有数据进行一次归并排序即可。
Reduce 阶段:reduce()函数将计算结果写到 HDFS 上。
9、MapReduce Join 9.1、Reduce Join两个表 Join,特别是两个 大表 Join 的时候,Map 端拥有一个表的全部数据,如果这时候 Map 端进行 Join,就是不完整的Join。 Reduce 端接收到 这些不完整的 Join 数据后,需要更多的逻辑来判断 已Join 和 未Join 两个场景。那么一种简单的 Join,就是在 Reduce 端进行 Join,详见如下:
Map 端的主要工作:为来自不同表或文件的 key/value 对,打标签以区别不同来源的记录。然后用连接字段作为 key,其余部分和新加的标志作为 value,最后进行输出。
Reduce 端的主要工作:在 Reduce 端以连接字段作为 key 的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在 Map 阶段已经打标志)分开,最后进行合并就 ok 了。
9.2、Map JoinReduce Join 是在 Reduce 阶段进行合并,ReduceTask 的数量一般比较少,并且Reducer处理过多表,特别数据不均衡,很容易出现数据倾斜。如果大表和小表Join,可以在 Map 端缓存多张表,提前处理业务逻辑,这样增加 Map 端业务,减少 Reduce 端数据的压力,尽可能的减少数据倾斜。注意,因为需要缓存数据,Map Join 需要考虑网络开销 以及 内存开销。
具体实现:采用 DistributedCache
在 Mapper 的 setup 阶段,将文件读取到缓存集合中。
在 Driver 驱动类中加载缓存。
代码上面是 Job.addCacheFile(new URI("pdfs://xxxx"))
10、MapReduce 知识点 10.1、MapReduce 怎么实现 TopN?可以自定义groupingcomparator,或者在map端对数据进行排序,然后再reduce输出时,控制只输出前n个数。就 达到了topn输出的目的。
10.2、有可能使 Hadoop 任务输出到多个目录中么?如果可以,怎么做?可以输出到多个目录中,采用自定义OutputFormat:
自定义outputformat改写recordwriter,具体改写输出数据的方法write( ) 10.3、Hadoop中RecordReader的作用是什么?以怎样的方式从分片中读取一条记录,每读取一条记录都会调用RecordReader类;
系统默认的RecordReader是LineRecordReader
LineRecordReader是用每行的偏移量作为map的key,每行的内容作为map的value;
应用场景:自定义读取每一条记录的方式;自定义读入key的类型,如希望读取的key是文件的路径或名
字而不是该行在文件中的偏移量。