1、分区
(1)合理设置Map及Reduce数
如果MapReduce数据量过少,则单个的处理数据量过大;如果MapReduce数据量过多,则抢资源。
1)通常情况下,作业会通过input的目录产生一个或者多个map任务。
主要的决定因素有:input的文件总个数,input的文件大小,集群设置的文件块大小。
2)是不是map数越多越好?
答案是否定的。如果一个任务有很多小文件(远远小于块大小128m),则每个小文件也会被当做一个块,用一个map任务来完成,而一个map任务启动和初始化的时间远远大于逻辑处理的时间,就会造成很大的资源浪费。而且,同时可执行的map数是受限的。
一般根据期待数决定。期待数怎么来的,根据输入文件的形状,数据量大小。如果一个数据量过多,一个文件至少一个切片,文件过大则多切几个片。当然有些时候,一个任务很多小文件(10k文件有1,000,000个,不可能启动1,000,000个task),这时可以采用其他手段来做。
3)是不是保证每个map处理接近128m的文件块,就高枕无忧了?
答案也是不一定。比如有一个127m的文件,正常会用一个map去完成,但这个文件只有一个或者两个小字段,却有几千万的记录,如果map处理的逻辑比较复杂,用一个map任务去做,肯定也比较耗时。
Hive处理数据压力来自数据量,数据的行数。同样是128M,条目可能不一样。一行10k数据和一行1k数据,条目数量差了10倍。甚至一行100k数据,这样128M就没几行了(100多行而已)。因为128M是大部分比较合适的大小,但并不是绝对合适。
针对上面的问题2和3,我们需要采取两种方式来解决:即减少map数和增加map数;
(2)复杂文件增加Map数
当input的文件都很大,任务逻辑复杂,map执行非常慢的时候,可以考虑增加Map数,来使得每个map处理的数据量减少,从而提高任务的执行效率。
这个针对的是计算命题性的任务。1kb数据要计算10分钟,128M就不知道要算到什么时候,所以这种事就要谨慎,任务量少一点。
增加map的方法为:根据computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M公式,调整maxSize最大值。让maxSize最大值低于blocksize就可以增加map的个数。
案例实操:
1).执行查询
hive (default)> select count(*) from emp;
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
正在上传…重新上传取消正在上传…重新上传取消正在上传…重新上传取消
只有1个map。
2).设置最大切片值为100个字节(当然只是演示而已,实际上不会这么做的)
hive (default)> set mapreduce.input.fileinputformat.split.maxsize=100;
hive (default)> select count(*) from emp;
Hadoop job information for Stage-1: number of mappers: 6; number of reducers: 1
正在上传…重新上传取消正在上传…重新上传取消正在上传…重新上传取消
有6个map。
将来一个map处理的数据非常慢的时候,可以手动设置大小,让它处理的灵活一点。
2、小文件进行合并
如果输入文件过小的时候,放置大量的maptask也不划算,
(1)在map执行前合并小文件,减少map数:CombineHiveInputFormat具有对小文件进行合并的功能(系统默认的格式)。HiveInputFormat没有对小文件合并功能。
set hive.input.format= org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
(2)在Map-Reduce的任务结束时合并小文件的设置:
在map-only任务结束时合并小文件,默认true
SET hive.merge.mapfiles = true;
在map-reduce任务结束时合并小文件,默认false
SET hive.merge.mapredfiles = true;
合并文件的大小,默认256M
SET hive.merge.size.per.task = 268435456;
当输出文件的平均大小小于该值时,启动一个独立的map-reduce任务进行文件merge
SET hive.merge.smallfiles.avgsize = 16777216;
3、合理设置Reduce数
1).调整reduce个数方法一
(1)每个Reduce处理的数据量默认是256MB
hive.exec.reducers.bytes.per.reducer=256000000
(2)每个任务最大的reduce数,默认为1009
hive.exec.reducers.max=1009
(3)计算reducer数的公式
N=min(参数2,总输入数据量/参数1)
2).调整reduce个数方法二
在hadoop的mapred-default.xml文件中修改
设置每个job的Reduce个数
set mapreduce.job.reduces = 15;
3).reduce个数并不是越多越好
(1)过多的启动和初始化reduce也会消耗时间和资源;
(2)另外,有多少个reduce,就会有多少个输出文件,如果生成了很多个小文件,那么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题;
在设置reduce个数的时候也需要考虑这两个原则:处理大数据量利用合适的reduce数;使单个reduce任务处理数据量大小要合适;
2、并行执行
Hive会将一个查询转化成一个或者多个阶段。这样的阶段可以是MapReduce阶段、抽样阶段、合并阶段、limit阶段。或者Hive执行过程中可能需要的其他阶段。默认情况下,Hive一次只会执行一个阶段。不过,某个特定的job可能包含众多的阶段,而这些阶段可能并非完全互相依赖的,也就是说有些阶段是可以并行执行的,这样可能使得整个job的执行时间缩短。不过,如果有更多的阶段可以并行执行,那么job可能就越快完成。
通过设置参数hive.exec.parallel值为true,就可以开启并发执行。不过,在共享集群中,需要注意下,如果job中并行阶段增多,那么集群利用率就会增加。
set hive.exec.parallel=true; //打开任务并行执行
set hive.exec.parallel.thread.number=16; //同一个sql允许最大并行度,默认为8。
当然,得是在系统资源比较空闲的时候才有优势,否则,没资源,并行也起不来。
3、严格模式
Hive提供了一个严格模式,可以防止用户执行那些可能意想不到的不好的影响的查询。(防止新手手滑)
通过设置属性hive.mapred.mode值为默认是非严格模式nonstrict 。开启严格模式需要修改hive.mapred.mode值为strict,开启严格模式可以禁止3种类型的查询。
The mode in which the Hive operations are being performed.
In strict mode, some risky queries are not allowed to run、They include:
Cartesian Product.
No partition being picked up for a query.
Comparing bigints and strings.
Comparing bigints and doubles.
Orderby without limit.
1)对于分区表,除非where语句中含有分区字段过滤条件来限制范围,否则不允许执行。换句话说,就是用户不允许扫描所有分区。进行这个限制的原因是,通常分区表都拥有非常大的数据集,而且数据增加迅速。没有进行分区限制的查询可能会消耗令人不可接受的巨大资源来处理这个表。
2)对于使用了order by语句的查询,要求必须使用limit语句。因为order by为了执行排序过程会将所有的结果数据分发到同一个Reducer中进行处理,强制要求用户增加这个LIMIT语句可以防止Reducer额外执行很长一段时间。
3)限制笛卡尔积的查询。对关系型数据库非常了解的用户可能期望在执行JOIN查询的时候不使用ON语句而是使用where语句,这样关系数据库的执行优化器就可以高效地将WHERe语句转化成那个ON语句。不幸的是,Hive并不会执行这种优化,因此,如果表足够大,那么这个查询就会出现不可控的情况。
这种严格模式公司里面都是打开的,而且设置的很严。
4、JVM重用
对于数据量很小的任务,一个半task处理完了。JVM一般就销毁了。第二个任务重新启动。而JVM重用则是,当你的maptask结束后,JVM虚拟机不关,启动另一个maptask。来来回回省掉了开虚拟机的开销。而且JVM可以重用不止一次(一般可以10~20次)。小文件处理的时候,效果非常明显,所以好处就是速度加快了。坏处则是,由于Java内存管理是虚拟机自己通过gc完成的,gc可以把不需要的类回收了,但这个过程不是100%的,JVM用的越多,内存中的垃圾就越多。重用200次理论上也可以,但是用到200次的时候,会更加慢,时间都浪费在gc上了。
这事为什么C++没用,C++的内存全靠人类自己掌控。
第1:数组越界。Java数组越界会抛出个异常。C++会告诉你这个内存不能读,然后程序就崩溃死掉了,没有捕获异常的机会。
第2:C++的内存管理全部手工管理。什么时候对象该删除,什么时候对象不该删除。全部由程序员自己搞定。但这个是个双刃剑,不好的地方在于这个很累,好的地方在于控制力很强,用完了可以回收掉,资料利用的非常紧凑。
第3:指针。指针是C++最让人头疼的地方。Java只有引用,没有指针。
JVM重用是Hadoop调优参数的内容,其对Hive的性能具有非常大的影响,特别是对于很难避免小文件的场景或task特别多的场景,这类场景大多数执行时间都很短。
Hadoop的默认配置通常是使用派生JVM来执行map和Reduce任务的。这时JVM的启动过程可能会造成相当大的开销,尤其是执行的job包含有成百上千task任务的情况。JVM重用可以使得JVM实例在同一个job中重新使用N次。N的值可以在Hadoop的mapred-site.xml文件中进行配置。通常在10-20之间,具体多少需要根据具体业务场景测试得出。
no limit.
这个功能的缺点是,开启JVM重用将一直占用使用到的task插槽,以便进行重用,直到任务完成后才能释放。如果某个“不平衡的”job中有某几个reduce task执行的时间要比其他Reduce task消耗的时间多的多的话,那么保留的插槽就会一直空闲着却无法被其他的job使用,直到所有的task都结束了才会释放。
5、推测执行
在yarn的时候涉及到任务的推测执行。其中有一个maptask执行的非常慢,而其他的很快,hadoop框架会单独给这个很慢的启动一个备份。举个栗子,开发一个项目,C很慢,开一个D,干C的活。
在分布式集群环境下,因为程序Bug(包括Hadoop本身的bug),负载不均衡或者资源分布不均等原因,会造成同一个作业的多个任务之间运行速度不一致,有些任务的运行速度可能明显慢于其他任务(比如一个作业的某个任务进度只有50%,而其他所有任务已经运行完毕),则这些任务会拖慢作业的整体执行进度。为了避免这种情况发生,Hadoop采用了推测执行(Speculative Execution)机制,它根据一定的法则推测出“拖后腿”的任务,并为这样的任务启动一个备份任务,让该任务与原始任务同时处理同一份数据,并最终选用最先成功运行完成任务的计算结果作为最终结果。
设置开启推测执行参数:Hadoop的mapred-site.xml文件中进行配置,默认是true
may be executed in parallel.
may be executed in parallel.
不过hive本身也提供了配置项来控制reduce-side的推测执行:默认是true。假如太慢了,就加一个备份任务。
关于调优这些推测执行变量,还很难给一个具体的建议。如果用户对于运行时的偏差非常敏感的话,那么可以将这些功能关闭掉。如果用户因为输入数据量很大而需要执行长时间的map或者Reduce task的话,那么启动推测执行造成的浪费是非常巨大大。
任务推测执行不能乱开。推测执行是指执行的时候很慢的时候可以开一个备份。Mapreduce任务它只能在map阶段开任务,不能在reduce端开。如果发生了数据倾斜,启动一个任务慢的原因不是因为这个结点太慢,而是因为这个任务处理的数据量太大。这时多启动几个备份任务除了浪费资源外,没有任何卵用。所以到了reduce端不会启动备份任务,因为不确定这个慢到底是因为什么原因引起的,更大概率的可能是由于数据倾斜引起的。此时启动任务推测执行反而会有反效果。
Hive提供了reduce端的推测执行,但这个一定要小心。如果产生了数据倾斜,可能会得不偿失。
6、压缩
详见7.8章节
压缩后,采用列式存储能提供效率。
7、执行计划(Explain)
1.基本语法
EXPLAIN [EXTENDED | DEPENDENCY | AUTHORIZATION] query
2.案例实操
(1)查看下面这条语句的执行计划
hive (default)> explain select * from emp;
hive (default)> explain select deptno, avg(sal) avg_sal from emp group by deptno;
(2)查看详细执行计划
hive (default)> explain extended select * from emp;
hive (default)> explain extended select deptno, avg(sal) avg_sal from emp group by deptno;