好好理解Map-Reduce过程,多看看执行计划【explain 查询语句】,就会更帮帮地理解咯
二、Hive-SQL语句转化成MapReduce 1)Join的实现原理select u.name, o.orderid from order o join user u on o.uid = u.uid;
2)Group By的实现原理 将GroupBy的字段组合为map的输出key值,利用MapReduce的排序,
在reduce阶段保存LastKey区分不同的key。
MapReduce的过程如下(当然这里只是说明Reduce端的非Hash聚合过程)
select rank, isonline, count(*) from city group by rank, isonline;
3)Distinct的实现原理:单个字段select dealid, count(distinct uid), count(distinct date) from order group by dealid;
4)Distinct的实现原理:多个字段select dealid, count(distinct uid)from order group by dealid;
三、分组 1、group by 子句错误示例:select deptno, job, count(*) from emp;
想要聚合操作,且有其他列名。那么一定要有group by子句
正确示例:
select count(*) from emp;;
select t.deptno, t.job, max(t.sal) max_sal from emp t group by t.deptno, t.job
四、排序order by:全局排序
sort by 每个reducer内单独排序,最后再合一起(与全局排序的结果是不一样,是每个reducer内排序后叠在一起)
五、分区&分桶1.索引
Hive支持索引,但是Hive的索引与关系型数据库中的索引并不相同,比如,Hive不支持主键或者外键。
Hive索引可以建立在表中的某些列上,以提升一些操作的效率,例如减少MapReduce任务中需要读取的数据块的数量。
**为什么要创建索引?**Hive的索引目的是提高Hive表指定列的查询速度。没有索引时,类似’WHERe tab1.col1 = 10’ 的查询,Hive会加载整张表或分区,然后处理所有的rows,但是如果在字段col1上面存在索引时,那么只会加载和处理文件的一部分。
与其他传统数据库一样,增加索引在提升查询速度时,会消耗额外资源去创建索引表和需要更多的磁盘空间存储索引。
2.分区
为了对表进行合理的管理以及提高查询效率,Hive可以将表组织成“分区”。
分区是表的部分列的集合,可以为频繁使用的数据建立分区,这样查找分区中的数据时就不需要扫描全表,这对于提高查找效率很有帮助。
分区是一种根据“分区列”(partition column)的值对表进行粗略划分的机制。Hive中每个分区对应着表很多的子目录,将所有的数据按照分区列放入到不同的子目录中去。
**为什么要分区?**庞大的数据集可能需要耗费大量的时间去处理。在许多场景下,可以通过分区的方法减少每一次扫描总数据量,这种做法可以显著地改善性能。
数据会依照单个或多个列进行分区,通常按照时间、地域或者是商业维度进行分区。为了达到性能表现的一致性,对不同列的划分应该让数据尽可能均匀分布。最好的情况下,分区的划分条件总是能够对应where语句的部分查询条件。
Hive的分区使用HDFS的子目录功能实现。每一个子目录包含了分区对应的列名和每一列的值。但是由于HDFS并不支持大量的子目录,这也给分区的使用带来了限制。我们有必要对表中的分区数量进行预估,从而避免因为分区数量过大带来一系列问题。
Hive查询通常使用分区的列作为查询条件。这样的做法可以指定MapReduce任务在HDFS中指定的子目录下完成扫描的工作。HDFS的文件目录结构可以像索引一样高效利用。
3.分桶
桶是通过对指定列进行哈希计算来实现的,通过哈希值将一个列名下的数据切分为一组桶,并使每个桶对应于该列名下的一个存储文件。
**为什么要分桶?**在分区数量过于庞大以至于可能导致文件系统崩溃时,我们就需要使用分桶来解决问题了。
分区中的数据可以被进一步拆分成桶,不同于分区对列直接进行拆分,桶往往使用列的哈希值对数据打散,并分发到各个不同的桶中从而完成数据的分桶过程。
hive使用对分桶所用的值进行hash,并用hash结果除以桶的个数做取余运算的方式来分桶,保证了每个桶中都有数据,但每个桶中的数据条数不一定相等。
哈希函数的选择依赖于桶操作所针对的列的数据类型。除了数据采样,桶操作也可以用来实现高效的Map端连接操作。
在数据量足够大的情况下,分桶比分区,更高的查询效率。
4.总结
索引和分区最大的区别就是索引不分割数据库,分区分割数据库。
索引其实就是拿额外的存储空间换查询时间,但分区已经将整个大数据库按照分区列拆分成多个小数据库了。
分区和分桶最大的区别就是分桶随机分割数据库,分区是非随机分割数据库。因为分桶是按照列的哈希函数进行分割的,相对比较平均;而分区是按照列的值来进行分割的,容易造成数据倾斜。
其次两者的另一个区别就是分桶是对应不同的文件(细粒度),分区是对应不同的文件夹(粗粒度)。
普通表(外部表、内部表)、分区表这三个都是对应HDFS上的目录,桶表对应是目录里的文件。
EXPLAIN [EXTENDED | DEPENDENCY | AUTHORIZATION] query
不走MR任务:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9OJZ0byX-1645859394946)(https://gitee.com/yufeng0507/cloud-image/raw/master/img/202202251135194.png)]
走MR任务:
2)Fetch抓取Fetch 抓取是指,Hive 中对某些情况的查询可以不必使用 MapReduce 计算。
例如:SELECt * FROM employees;在这种情况下,Hive 可以简单地读取 employee 对应的存储目录下的文件,然后输出查询结果到控制台。
在 hive-default.xml.template 文件中 hive.fetch.task.conversion 默认是 more,老版本 hive 默认是 minimal,该属性修改为 more 以后,在全局查找、字段查找、limit 查找等都不走mapreduce。
hive.fetch.task.conversion=more:全局查找、字段查找、limit 查找等都不走MR。
hive.fetch.task.conversion=none:所有查询语句都走MR
3)本地模式:应对小数据集大多数的 Hadoop Job 是需要 Hadoop 提供的完整的可扩展性来处理大数据集的。不过, 有时 Hive 的输入数据量是非常小的。在这种情况下,为查询触发执行任务消耗的时间可能会比实际 job 的执行时间要多的多。对于大多数这种情况,Hive 可以通过本地模式在单台机器上处理所有的任务。对于小数据集,执行时间可以明显被缩短。
用户可以通过设置hive.exec.mode.local.auto的值为 true,来让 Hive 在适当的时候自动启动这个优化。
//开启本地 mrset hive.exec.mode.local.auto=true; //设置 local mr 的最大输入数据量,当输入数据量小于这个值时采用 local mr 的方式,默认为 134217728,即 128Mset hive.exec.mode.local.auto.inputbytes.max=50000000;//设置 local mr 的最大输入文件个数,当输入文件个数小于这个值时采用 local mr 的方式,默认为 4set hive.exec.mode.local.auto.input.files.max=10;
4) 表的优化 1、小表join大表(mapjoin)将 key 相对分散,并且数据量小的表放在 join 的左边,可以使用 map join 让小的维度表 先进内存。在 map 端完成 join。
实际测试发现:新版的 hive 已经对小表 JOIN 大表和大表 JOIN 小表进行了优化。小表放 在左边和右边已经没有区别。
(1)设置自动选择 Mapjoin
set hive.auto.convert.join = true; 默认为 true
(2)大表小表的阈值设置(默认 25M 以下认为是小表):
set hive.mapjoin.smalltable.filesize = 25000000;
把小表放到内存中,加快速度
2、大表join大表【再理解】 1)空KEY过滤有时 join 超时是因为某些 key 对应的数据太多,而相同 key 对应的数据都会发送到相同 的 reducer 上,从而导致内存不够。此时我们应该仔细分析这些异常的 key,很多情况下, 这些 key 对应的数据是异常数据,我们需要在 SQL 语句中进行过滤。
此时我们认为这些空KEY是没有用的异常数据
没有过滤空id
insert overwrite table jointable select n.* from nullidtable n left join bigtable o on n.id = o.id;
过滤空id
insert overwrite table jointable select n.* from (select * from nullidtable where id is not null) n left join bigtable o on n.id = o.id;
2)空KEY转换时虽然某个 key 为空对应的数据很多,但是相应的数据不是异常数据,必须要包含在 join 的结果中,此时我们可以表 a 中 key 为空的字段赋一个随机的值,使得数据随机均匀地 分不到不同的 reducer 上。
此时我们认为这些空KEY是有用的,但需就加以处理,解决数据倾斜
原来code:
insert overwrite table jointableselect n.* from nullidtable n left join bigtable b on n.id = b.id;
修改后code:
set mapreduce.job.reduces = 5;insert overwrite table jointableselect n.* from nullidtable n full join bigtable o on nvl(n.id,rand()) = o.id;
null 值本来就是匹配不上的,只是我们要保留这些null,那么就给他们分配一个随机数,让他们均匀的走到各个reducer去,
注意,这里给的随机数不能出现和id一样的数字,不然岂不是原来匹配不上的数据,现在又匹配上了。
3)SMB(Sort Merge Bucket join) 3、Group By默认情况下,Map 阶段同一 Key 数据分发给一个 reduce,当一个 key 数据过大时就倾斜了。
并不是所有的聚合操作都需要在 Reduce 端完成,很多聚合操作都可以先在 Map 端进行 部分聚合,最后在 Reduce 端得出最终结果
本质:
旧的逻辑:
Table a 和 Table b,map端并行读取,然后在reduce中再进行聚合,这样再reduce中聚合的话如果数据很多的话就压力很大。
新的逻辑:
当选项set hive.groupby.skewindata = true 设定为 true,生成的查询计划会有两个 MR Job。
第一个 MR Job 中,Map 的输出结果会**随机分布【不按照分组字段来】**到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果 是相同的, Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;
第二 个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证 相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。
1)开启Map端聚合参数设置
(1)是否在 Map 端进行聚合,默认为 True
set hive.map.aggr = true
(2)在 Map 端进行聚合操作的条目数目
set hive.groupby.mapaggr.checkinterval = 100000
(3)有数据倾斜的时候进行负载均衡(默认是 false)
set hive.groupby.skewindata = true
4、Count(Distinct)数据量小的时候无所谓,数据量大的情况下,由于 COUNT DISTINCT 操作需要用一个 Reduce Task 来完成,这一个 Reduce 需要处理的数据量太大,就会导致整个 Job 很难完成, 一般 COUNT DISTINCT 使用先 GROUP BY 再 COUNT 的方式替换,但是需要注意 group by 造成 的数据倾斜问题.
5.避免笛卡尔积尽量避免笛卡尔积,join 的时候不加 on 条件,或者无效的 on 条件,Hive 只能使用 1 个reducer 来完成笛卡尔积。
6.行列过滤**列处理:**在 SELECt 中,只拿需要的列,如果有分区,尽量使用分区过滤,少用 SELECT *。
**行处理:**在分区剪裁中,当使用外关联时,如果将副表的过滤条件写在 Where 后面,那么就会先全表关联,之后再过滤,
比如: 案例实操:建议【先筛选再连接】
1)先关联,再筛选
select o.id from bigtable b join bigtable o on o.id = b.idwhere o.id <= 10;
2)先筛选,再关联。【建议】
select b.id from bigtable bjoin (select id from bigtable where id <= 10) oon b.id = o.id;
7.使用分区表 8.使用分桶表 5)合理设置Map及Reduce数1)通常情况下,作业会通过 input 的目录产生一个或者多个 map 任务。
**主要的决定因素有:**input 的文件总个数,input 的文件大小,集群设置的文件块大小。
2)是不是 map 数越多越好? 答案是否定的。如果一个任务有很多小文件(远远小于块大小 128m),则每个小文件
也会被当做一个块,用一个 map 任务来完成,而一个 map 任务启动和初始化的时间远远大 于逻辑处理的时间,就会造成很大的资源浪费。而且,同时可执行的 map 数是受限的。
3)是不是保证每个 map 处理接近 128m 的文件块,就高枕无忧了?
答案也是不一定。比如有一个 127m 的文件,正常会用一个 map 去完成,但这个文件只 有一个或者两个小字段,却有几千万的记录,如果 map 处理的逻辑比较复杂,用一个 map 任务去做,肯定也比较耗时。
针对上面的问题 2 )和 3),我们需要采取两种方式来解决:即减少 map 数和增加 map 数;
5.1 复杂文件【增加Map数】当 input 的文件都很大,任务逻辑复杂,map 执行非常慢的时候,可以考虑增加 Map 数, 来使得每个 map 处理的数据量减少,从而提高任务的执行效率。
增加 map 的方法为:根据
computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M 公式,
调整 maxSize 最大值。让 maxSize 低于 blocksize 就可以增加 map 的个数。
5.2 小文件进行合并【减少Map数】1)在 map 执行前合并小文件,减少 map 数:CombineHiveInputFormat 具有对小文件进行合 并的功能(系统默认的格式)。HiveInputFormat 没有对小文件合并功能。
2)在 Map-Reduce 的任务结束时合并小文件的设置: 在 map-only 任务结束时合并小文件,默认 true
SET hive.merge.mapfiles = true;
在 map-reduce 任务结束时合并小文件,默认 false
SET hive.merge.mapredfiles = true;
合并文件的大小,默认 256M
SET hive.merge.size.per.task = 268435456;
当输出文件的平均大小小于该值时,启动一个独立的 map-reduce 任务进行文件 merge
SET hive.merge.smallfiles.avgsize = 16777216;
5.3 【设置Reduce数】 调整reduce,方法一: (1)每个 Reduce 处理的数据量默认是 256MB
hive.exec.reducers.bytes.per.reducer=256000000
(2)每个任务最大的 reduce 数,默认为 1009
hive.exec.reducers.max=1009
(3)计算 reducer 数的公式
N=min(参数 2,totalInputFileSize/参数 1)
源码解析Hive是如何动态计算reduce个数的
int reducers = Utilities.estimateNumberOfReducers(conf, inputSummary, work.getMapWork(), work.isFinalMapRed()); public static int estimateNumberOfReducers(HiveConf conf, ContentSummary inputSummary, MapWork work, boolean finalMapRed) throws IOException { // bytesPerReducer默认值为256M BYTESPERREDUCER("hive.exec.reducers.bytes.per.reducer", 256000000L) long bytesPerReducer = conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER); //maxReducers的默认值1009 MAXREDUCERS("hive.exec.reducers.max", 1009) int maxReducers = conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS); //对totalInputFileSize的计算 double samplePercentage = getHighestSamplePercentage(work); long totalInputFileSize = getTotalInputFileSize(inputSummary, work, samplePercentage); // if all inputs are sampled, we should shrink the size of reducers accordingly. if (totalInputFileSize != inputSummary.getLength()) { LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers=" + maxReducers + " estimated totalInputFileSize=" + totalInputFileSize); } else { LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers=" + maxReducers + " totalInputFileSize=" + totalInputFileSize); } // If this map reduce job writes final data to a table and bucketing is being inferred, // and the user has configured Hive to do this, make sure the number of reducers is a // power of two boolean powersOfTwo = conf.getBoolVar(HiveConf.ConfVars.HIVE_INFER_BUCKET_SORT_NUM_BUCKETS_POWER_TWO) && finalMapRed && !work.getBucketedColsByDirectory().isEmpty(); //【真正计算reduce个数的方法】看源码的技巧return的方法是重要核心方法 return estimateReducers(totalInputFileSize, bytesPerReducer, maxReducers, powersOfTwo); }
reduce个数计算过程
public static int estimateReducers(long totalInputFileSize, long bytesPerReducer, int maxReducers, boolean powersOfTwo) { // 假设totalInputFileSize 1000M // bytes=Math.max(1000M,256M)=1000M double bytes = Math.max(totalInputFileSize, bytesPerReducer); //reducers=(int)Math.ceil(1000M/256M)=4 此公式说明如果totalInputFileSize 小于256M ,则reducers=1 ;繁殖 则通过 int reducers = (int) Math.ceil(bytes / bytesPerReducer); //Math.max(1, 4)=4 ,reducers的结果还是4 reducers = Math.max(1, reducers); //Math.min(1009,4)=4; reducers的结果还是4 reducers = Math.min(maxReducers, reducers); int reducersLog = (int)(Math.log(reducers) / Math.log(2)) + 1; int reducersPowerTwo = (int)Math.pow(2, reducersLog); if (powersOfTwo) { // If the original number of reducers was a power of two, use that if (reducersPowerTwo / 2 == reducers) { // nothing to do } else if (reducersPowerTwo > maxReducers) { // If the next power of two greater than the original number of reducers is greater // than the max number of reducers, use the preceding power of two, which is strictly // less than the original number of reducers and hence the max reducers = reducersPowerTwo / 2; } else { // Otherwise use the smallest power of two greater than the original number of reducers reducers = reducersPowerTwo; } } return reducers; }
调正reduce,方法二: 在 hadoop 的 mapred-default.xml 文件中修改 设置每个 job 的 Reduce 个数
set mapreduce.job.reduces = 15;
(1)过多的启动和初始化 reduce 也会消耗时间和资源;
(2)另外,有多少个 reduce,就会有多少个输出文件,如果生成了很多个小文件,那 么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题;
在设置 reduce 个数的时候也需要考虑这两个原则:处理大数据量利用合适的 reduce 数; 使单个 reduce 任务处理数据量大小要合适;
6)设置并行执行Hive 会将一个查询转化成一个或者多个阶段。这样的阶段可以是 MapReduce 阶段、抽 样阶段、合并阶段、limit 阶段。或者 Hive 执行过程中可能需要的其他阶段。默认情况下, Hive 一次只会执行一个阶段。不过,某个特定的 job 可能包含众多的阶段,而这些阶段可能 并非完全互相依赖的,也就是说有些阶段是可以并行执行的,这样可能使得整个 job 的执行 时间缩短。不过,如果有更多的阶段可以并行执行,那么 job 可能就越快完成。
通过设置参数 hive.exec.parallel 值为 true,就可以开启并发执行。不过,在共享集群中, 需要注意下,如果 job 中并行阶段增多,那么集群利用率就会增加。
当然,得是在系统资源比较空闲的时候才有优势,否则,没资源,并行也起不来。
7)严格模式Hive 可以通过设置防止一些危险操作:
1.分区表不使用分区过滤将 SET hive.strict.checks.no.partition.filter=true时,对于分区表,除非 where 语句中含有分区字段过滤条件来限制范围,否则不允许执行。换句话说,就是用户不允许扫描所有分区。进行这个限制的原因是,通常分区表都拥有非常大的数据集,而且数据增加迅速。没有 进行分区限制的查询可能会消耗令人不可接受的巨大资源来处理这个表。
2.使用order by没有limit过滤将 SET hive.strict.checks.orderby.no.limit=true时,对于使用了 order by 语句的查询,要求必须使用 limit 语句。因为 order by 为了执行排序过程会将所有的结果数据分发到同一个 Reducer 中进行处理,强制要求用户增加这个 LIMIT 语句可以防止 Reducer 额外执行很长一 段时间。
3.笛卡尔积将 SET hive.strict.checks.cartesian.product=true 时,会限制笛卡尔积的查询。对关系型数 据库非常了解的用户可能期望在执行 JOIN 查询的时候不使用 ON 语句而是使用 where 语 句,这样关系数据库的执行优化器就可以高效地将 WHERe 语句转化成那个 ON 语句。不幸 的是,Hive 并不会执行这种优化,因此,如果表足够大,那么这个查询就会出现不可控的情况。
8)JVM重用适合小文件多的时候
9)设置压缩格式hive 表的数据存储格式一般选择:orc 或 parquet。压缩方式一 般选择 snappy,lzo。
**。进行这个限制的原因是,通常分区表都拥有非常大的数据集,而且数据增加迅速。没有 进行分区限制的查询可能会消耗令人不可接受的巨大资源来处理这个表。
2.使用order by没有limit过滤将 SET hive.strict.checks.orderby.no.limit=true时,对于使用了 order by 语句的查询,要求必须使用 limit 语句。因为 order by 为了执行排序过程会将所有的结果数据分发到同一个 Reducer 中进行处理,强制要求用户增加这个 LIMIT 语句可以防止 Reducer 额外执行很长一 段时间。
3.笛卡尔积将 SET hive.strict.checks.cartesian.product=true 时,会限制笛卡尔积的查询。对关系型数 据库非常了解的用户可能期望在执行 JOIN 查询的时候不使用 ON 语句而是使用 where 语 句,这样关系数据库的执行优化器就可以高效地将 WHERe 语句转化成那个 ON 语句。不幸 的是,Hive 并不会执行这种优化,因此,如果表足够大,那么这个查询就会出现不可控的情况。
8)JVM重用适合小文件多的时候
9)设置压缩格式hive 表的数据存储格式一般选择:orc 或 parquet。压缩方式一 般选择 snappy,lzo。