欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

Hive调优及参数优化

时间:2023-05-14
1 Hive参数优化 1.1 Hive基础配置 1.1.1 HiveServer2 的 Java 堆栈

Hiveserver2异常退出,导致连接失败的问题。
解决方法:修改HiveServer2 的 Java 堆栈大小。

 

1.1.2 SQL中limit用的较多时

limit 语句快速出结果一般情况下,Limit语句还是需要执行整个查询语句,然后再返回部分结果。有一个配置属性可以开启,避免这种情况。

Set hive.limit.optimize.enable=true;--(默认为false)Set hive.limit.row.max.size=100000;--(limit最多可以查询多少行,根据需求可以调大)Set hive.limit.optimize.limit.file=10;--(一个查询可以操作的最多文件数,根据需要适当调大)Set hive.limit.optimize.fetch.max=50000;--(fetch query,直接select from,能够获取的最大行数)

1.1.3 Hive执行引擎

        CDH支持的引擎包括MapReduce和Spark两种,可自由选择,Spark不一定比MR快,Hive2.x和Hadoop3.x经过多次优化,Hive-MR引擎的性能已经大幅提升。

1.2 压缩配置 1.2.1 Map输出压缩

        除了创建表时指定保存数据时压缩,在查询分析过程中,Map的输出也可以进行压缩。由于map任务的输出需要写到磁盘并通过网络传输到reducer节点,所以通过使用LZO、LZ4或者Snappy这样的快速压缩方式,是可以获得性能提升的,因为需要传输的数据减少了。

MapReduce配置项:

--设置是否启动map输出的压缩机制,默认为false。在需要减少网络传输的时候,可以设置为true。set mapreduce.map.output.compress

1.2.2 Reduce结果压缩

        是否对任务输出产生的结果进行压缩,默认值false。对传输数据进行压缩,既可以减少文件的存储空间,又可以加快数据在网络不同节点之间的传输速度。

1.2.3 Hive的Map-Reduce之间是否进行压缩

        控制 Hive 在多个 map-reduce 作业之间生成的中间 files 是否被压缩。压缩编解码器和其他选项由上面Job的变量mapreduce.output.fileoutputformat.compress.*确定。

set hive.exec.compress.intermediate=true;

1.2.4 Hive查询最终结果压缩

        控制是否压缩查询的最终输出(到 local/hdfs 文件或 Hive table)。压缩编解码器和其他选项由 上面Job中的变量mapreduce.output.fileoutputformat.compress.*确定。

set hive.exec.compress.output=true;


2 Hive优化 2.1 Hive分桶

        分桶是将数据集分解成更容易管理的若干部分的一个技术,是比分区更为细粒度的数据范围划分。

2.1.1 为什么要分桶

2.1.1.1 获得更高的查询处理效率

        在分区数量过于庞大以至于可能导致文件系统崩溃时,或数据集找不到合理的分区字段时,我们就需要使用分桶来解决问题了。
        分区中的数据可以被进一步拆分成桶,不同于分区对列直接进行拆分,桶往往使用列的哈希值对数据打散,并分发到各个不同的桶中从而完成数据的分桶过程。
        注意,hive使用对分桶所用的值进行hash,并用hash结果除以桶的个数做取余运算的方式来分桶,保证了每个桶中都有数据,但每个桶中的数据条数不一定相等。
        如果另外一个表也按照同样的规则分成了一个个小文件。两个表join的时候,就不必要扫描整个表,只需要匹配相同分桶的数据即可,从而提升效率。
        在数据量足够大的情况下,分桶比分区有更高的查询效率。

2.1.1.2 数据采样

        在真实的大数据分析过程中,由于数据量较大,开发和自测的过程比较慢,严重影响系统的开发进度。此时就可以使用分桶来进行数据采样。采样使用的是一个具有代表性的查询结果而不是全部结果,通过对采样数据的分析,来达到快速开发和自测的目的,节省大量的研发成本。        

2.1.2 分桶与分区的区别 分桶和分区两者不干扰,可以把分区表进一步分桶;分桶对数据的处理比分区更加细粒度化:分区针对的是数据的存储路径;分桶针对的是数据文件;分桶是按照列的哈希函数进行分割的,相对比较平均;而分区是按照列的值来进行分割的,容易造成数据倾斜。2.1.3 文本数据处理

        注意:对于分桶表,不能使用load data的方式进行数据插入操作,因为load data导入的数据不会有分桶结构。

如何避免针对桶表使用load data插入数据的误操作呢?

--限制对桶表进行load操作set hive.strict.checks.bucketing = true;

        也可以在CM的hive配置项中修改此配置,当针对桶表执行load data操作时会报错。

那么对于文本数据如何处理呢?

1、先创建临时表,通过load data将txt文本导入临时表。

--创建临时表create table temp_buck(id int, name string)row format delimited fields terminated by 't';--导入数据load data local inpath '/tools/test_buck.txt' into table temp_buck;

2、使用insert select语句间接的把数据从临时表导入到分桶表。

--启用桶表set hive.enforce.bucketing=true;--限制对桶表进行load操作set hive.strict.checks.bucketing = true;--insert selectinsert into table test_buck select id, name from temp_buck;--分桶成功

2.1.4 Map join

        MapJoin顾名思义,就是在Map阶段进行表之间的连接。而不需要进入到Reduce阶段才进行连接。这样就节省了在Shuffle阶段时要进行的大量数据传输。从而起到了优化作业的作用。
        要使MapJoin能够顺利进行,那就必须满足这样的条件:除了一份表的数据分布在不同的Map中外,其他连接的表的数据必须在每个Map中有完整的拷贝。
        所以并不是所有的场景都适合用MapJoin。它通常会用在如下的一些情景:在二个要连接的表中,有一个很大,有一个很小,这个小表可以存放在内存中而不影响性能。
        这样我们就把小表文件复制到每一个Map任务的本地,再让Map把文件读到内存中待用。
        在Hive v0.7之前,需要使用hint提示 才会执行MapJoin。Hive v0.7之后的版本已经不需要给出MapJoin的指示就进行优化。现在可以通过如下配置参数来进行控制:    

set hive.auto.convert.join=true;

    Hive还提供另外一个参数--表文件的大小作为开启和关闭MapJoin的阈值:

--旧版本为hive.mapjoin.smalltable.filesizeset hive.auto.convert.join.noconditionaltask.size=512000000

注意:如果hive.auto.convert.join是关闭的,则本参数不起作用。否则,如果参与连接的N个表(或分区)中的N-1个 的总大小小于512MB,则直接将连接转为Map连接。默认值为20MB。 

MapJoin的使用场景:

关联操作中有一张表非常小不等值的链接操作

2.1.4.1 大小表关联

select f.a,f.b from A t join B f on ( f.a=t.a and f.ftime=20110802)

        该语句中B表有30亿行记录,A表只有100行记录,而且B表中数据倾斜特别严重,有一个key上有15亿行记录,在运行过程中特别的慢,而且在reduece的过程中遇到执行时间过长或者内存不够的问题。
        MAPJION会把小表全部读入内存中,在map阶段直接拿另外一个表的数据和内存中表数据做匹配,由于在map时进行了join操作,省去了reduce运行的效率会高很多。
        这样就不会由于数据倾斜导致某个reduce上落数据太多而失败。于是原来的sql可以通过使用hint的方式指定join时使用mapjoin。

select f.a,f.b from A t join B f on ( f.a=t.a and f.ftime=20110802)

        在实际使用中,只要根据业务调整小表的阈值即可,hive会自动帮我们完成mapjoin,提高执行的效率。

2.1.4.2 不等连接

        mapjoin还有一个很大的好处是能够进行不等连接的join操作,如果将不等条件写在where中,那么mapreduce过程中会进行笛卡尔积,运行效率特别低,如果使用mapjoin操作,在map的过程中就完成了不等值的join操作,效率会高很多。

select A.a ,A.b from A join B where A.a>B.a

2.1.4.3 使用限制

LEFT OUTER JOIN的左表必须是大表;RIGHT OUTER JOIN的右表必须是大表;INNER JOIN左表或右表均可以作为大表;FULL OUTER JOIN不能使用MAPJOIN;MAPJOIN支持小表为子查询;使用MAPJOIN时需要引用小表或是子查询时,需要引用别名;在MAPJOIN中,可以使用不等值连接或者使用OR连接多个条件;在MAPJOIN中最多支持指定6张小表,否则报语法错误;如果使用MAPJOIN,则所有小表占用的内存总和不得超过设置的内存(解压后的逻辑数据量)。2.1.5 Bucket-Map Join

2.1.5.1 作用

        两个表join的时候,小表不足以放到内存中,但是又想用map side join这个时候就要用到bucket Map join。其方法是两个join表在join key上都做hash bucket,并且把你打算复制的那个(相对)小表的bucket数设置为大表的倍数。这样数据就会按照key join,做hash bucket。小表依然复制到所有节点,Map join的时候,小表的每一组bucket加载成hashtable,与对应的一个大表bucket做局部join,这样每次只需要加载部分hashtable就可以了。

2.1.5.1 条件

set hive.optimize.bucketmapjoin = true;一个表的bucket数是另一个表bucket数的整数倍bucket列 == join列必须是应用在map join的场景中

注意:如果表不是bucket的,则只是做普通join。

2.1.6 SMB join

        全称Sort Merge Bucket Join。

2.1.6.1 作用

        大表对小表应该使用MapJoin来进行优化,但是如果是大表对大表,如果进行shuffle,那就非常可怕,第一个慢不用说,第二个容易出异常,此时就可以使用SMB Join来提高性能。SMB Join基于bucket-mapjoin的有序bucket,可实现在map端完成join操作,可以有效地减少或避免shuffle的数据量。SMB join的条件和Map join类似但又不同。        

2.1.6.2 条件

bucket mapjoinSMB joinset hive.optimize.bucketmapjoin = true;set hive.optimize.bucketmapjoin = true;
set hive.auto.convert.sortmerge.join=true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
set hive.auto.convert.sortmerge.join.noconditionaltask=true;一个表的bucket数是另一个表bucket数的整数倍小表的bucket数=大表bucket数bucket列 == join列Bucket 列 == Join 列 == sort 列必须是应用在map join的场景中必须是应用在bucket mapjoin 的场景中

2.1.6.3 注意事项

        hive并不检查两个join的表是否已经做好bucket且sorted,需要用户自己去保证join的表数据sorted,否则可能数据不正确。
        有两个办法:

hive.enforce.sorting 设置为 true。开启强制排序时,插数据到表中会进行强制排序,默认false。插入数据时通过在sql中用distributed c1 sort by c1 或者 cluster by c1

另外,表创建时必须是CLUSTERED且SORTED,如下:

create table test_smb_2(mid string,age_id string)CLUSTERED BY(mid) SORTED BY(mid) INTO 500 BUCKETS;

综上,涉及到分桶表操作的齐全配置为:

--写入数据强制分桶set hive.enforce.bucketing=true;--写入数据强制排序set hive.enforce.sorting=true;--开启bucketmapjoinset hive.optimize.bucketmapjoin = true;--开启SMB Joinset hive.auto.convert.sortmerge.join=true;set hive.auto.convert.sortmerge.join.noconditionaltask=true;

开启MapJoin的配置(hive.auto.convert.join和hive.auto.convert.join.noconditionaltask.size),还有限制对桶表进行load操作(hive.strict.checks.bucketing)可以直接设置在hive的配置项中,无需在sql中声明。
自动尝试SMB联接(hive.optimize.bucketmapjoin.sortedmerge)也可以在设置中进行提前配置。


3 Hive并行操作 3.1 Hive编译查询限制

        Hive默认同时只能编译一段HiveQL,并上锁。
        将hive.driver.parallel.compilation设置为true,各个会话可以同时编译查询,提高团队工作效率。否则如果在UDF中执行了一段HiveQL,或者多个用户同时使用的话, 就会锁住。
        修改hive.driver.parallel.compilation.global.limit的值,0或负值为无限制,可根据团队人员和硬件进行修改,以保证同时编译查询。

3.2 Hive不同阶段任务并行执行

        Hive会将一个查询转化为一个或多个阶段,包括:MapReduce阶段、抽样阶段、合并阶段、limit阶段等。默认情况下,一次只执行一个阶段。不过,如果某些阶段不是互相依赖,是可以并行执行的。
 

set hive.exec.parallel=true; --可以开启并发执行,默认为falseset hive.exec.parallel.thread.number=16; --同一个sql允许的最大并行度,默认为8


4 Hive索引

        Hive支持索引,但是Hive的索引与关系型数据库中的索引并不相同,比如,Hive不支持主键或者外键。
        Hive索引可以建立在表中的某些列上,以提升一些操作的效率,例如减少MapReduce任务中需要读取的数据块的数量。
        在可以预见到分区数据非常庞大的情况下,分桶和索引常常是优于分区的。而分桶由于SMB Join对关联键要求严格,所以并不是总能生效。

4.1 Hive索引

        Hive的索引目的是提高Hive表指定列的查询速度。
        没有索引时,类似'WHERe tab1.col1 = 10' 的查询,Hive会加载整张表或分区,然后处理所有的rows,但是如果在字段col1上面存在索引时,那么只会加载和处理文件的一部分。
        在每次建立、更新数据后,Hive索引不会自动更新,需要手动进行更新(重建索引以构建索引表),会触发一个mr job。
        Hive索引使用过程繁杂,而且性能一般,在Hive3.0中已被删除,在工作环境中不推荐优先使用,在分区数量过多或查询字段不是分区字段时,索引可以作为补充方案同时使用。推荐使用ORC文件格式的索引类型进行查询。

4.2 Row Group Index

        一个ORC文件包含一个或多个stripes(groups of row data),每个stripe中包含了每个column的min/max值的索引数据,当查询中有<,>,=的操作时,会根据min/max值,跳过扫描不包含的stripes。
        而其中为每个stripe建立的包含min/max值的索引,就称为Row Group Index行组索引,也叫min-max Index大小对比索引,或者Storage Index。
        在建立ORC格式表时,指定表参数’orc.create.index’=’true’之后,便会建立Row Group Index,需要注意的是,为了使Row Group Index有效利用,向表中加载数据时,必须对需要使用索引的字段进行排序,否则,min/max会失去意义。另外,这种索引主要用于数值型字段的查询过滤优化上。
        设置hive.optimize.index.filter为true,并重启hive

创建表

CREATE TABLE lxw1234_orc2 stored AS ORCTBLPROPERTIES( 'orc.compress'='SNAPPY',-- 开启行组索引 'orc.create.index'='true')AS SELECt CAST(siteid AS INT) AS id, pcid FROM lxw1234_text-- 插入的数据保持排序 DISTRIBUTE BY id sort BY id;

查询

set hive.optimize.index.filter=true;SELECt COUNT(1) FROM lxw1234_orc1 WHERe id >= 1382 AND id <= 1399;

4.3 Bloom Filter Index

        在建表时候,通过表参数”orc.bloom.filter.columns”=”pcid”来指定为那些字段建立BloomFilter索引,这样,在生成数据的时候,会在每个stripe中,为该字段建立BloomFilter的数据结构,当查询条件中包含对该字段的=号过滤时候,先从BloomFilter中获取以下是否包含该值,如果不包含,则跳过该stripe。

创建表

CREATE TABLE lxw1234_orc2 stored AS ORCTBLPROPERTIES( 'orc.compress'='SNAPPY', 'orc.create.index'='true',-- pcid字段开启BloomFilter索引 "orc.bloom.filter.columns"="pcid")ASSELECt CAST(siteid AS INT) AS id,pcidFROM lxw1234_textDISTRIBUTE BY id sort BY id;

查询

SET hive.optimize.index.filter=true;SELECt COUNT(1) FROM lxw1234_orc1 WHERe id >= 0 AND id <= 1000 AND pcid IN ('0005E26F0DCCDB56F9041C','A');

只有在数据量较大时,使用索引才能带来性能优势。


5 数据清洗转换优化 5.1 Hive小文件合并

此部分设置,要根据硬件内存来进行调整,个人电脑配置较低,不建议修改。

是否开启合并Map端小文件,在Map-only的任务结束时合并小文件,true是打开。

hive.merge.mapfiles

是否开启合并Reduce端小文件,在map-reduce作业结束时合并小文件。true是打开。

hive.merge.mapredfiles

合并后MR输出文件的大小,默认为256M。

hive.merge.size.per.task

当输出文件的平均大小小于此设置值时,启动一个独立的map-reduce任务进行文件merge,默认值为16M。

hive.merge.smallfiles.avgsize

5.2 矢量化查询

        hive的默认查询执行引擎一次处理一行,而矢量化查询执行是一种hive特性,目的是按照每批1024行读取数据,并且一次性对整个记录整合(而不是对单条记录)应用操作,注意:要使用矢量化查询执行,就必须以ORC格式存储数据。

set hive.vectorized.execution.enabled=true;

5.3 读取零拷贝

        ORC可以使用新的HDFS缓存API和ZeroCopy读取器来避免在扫描文件时将额外的数据复制到内存中。

set hive.exec.orc.zerocopy=true;


6 统计分析优化 6.1 关联优化器

        在Hive的一些复杂关联查询中,可能同时还包含有group by等能够触发shuffle的操作,有些时候shuffle操作是可以共享的,通过关联优化器选项,可以尽量减少复杂查询中的shuffle,从而提升性能。

set hive.optimize.correlation=true;

6.2 数据倾斜Skewin

        如果数据量很大或者出现了数据倾斜比较严重的情况,如何来优化呢?

6.2.1 Map join、Bucket-Map join、SMB join 6.2.2 表连接数据倾斜(Join skewin)

6.2.2.1 运行时优化

set hive.optimize.skewjoin=true; --默认关闭。

        如果大表和大表进行join操作,则可采用skewjoin(倾斜关联)来开启对倾斜数据的优化。

skewjoin原理:对于skewjoin.key,在执行job时,将它们存入临时的HDFS目录,其它数据正常执行对倾斜数据开启map join操作(多个map并行处理),对非倾斜值采取普通join操作将倾斜数据集和非倾斜数据集进行合并Union操作。

开启skewin以后,究竟多大的数据才会被认为是倾斜了的数据呢?

set hive.skewjoin.key=100000;

默认值100000。
        如果join的key对应的记录条数超过这个值,就认为这个key产生了数据倾斜,则会对其进行分拆优化。

6.2.2.2 编译时优化

        上面的配置项其实应该理解为hive.optimize.skewjoin.runtime,也就是sql运行时来对偏斜信息进行优化;除此之外还有另外一个配置:

set hive.optimize.skewjoin.compiletime=true; --默认关闭。

        此参数的用处和上面的hive.optimize.skewjoin一致,但在编译sql时就已经将执行计划优化完毕。但要注意的是,只有在表的元数据中存储的有数据倾斜信息时,才能生效。因此建议runtime和compiletime都设置为true。
        可以通过建表语句来指定数据倾斜元数据:

CREATE TABLE list_bucket_single (key STRING, value STRING)-- 倾斜的字段和需要拆分的key值SKEWED BY (key) ON (1,5,6)-- 为倾斜值创建子目录单独存放[STORED AS DIRECTORIES];

6.2.2.3 Union时优化

        应用了表连接倾斜优化以后,会在执行计划中插入一个新的union操作,此时建议开启对union的优化配置:

set hive.optimize.union.remove=true; --默认关闭。

        此项配置减少对Union all子查询中间结果的二次读写,可以避免union输出的额外扫描过程,当我们开启了skewjoin时尤其有用,建议同时开启。

set hive.optimize.skewjoin=true;set hive.optimize.skewjoin.compiletime=true;set hive.optimize.union.remove=true;

6.2.3 分组统计数据倾斜(Groupby skewin)

6.2.3.1 Map阶段聚合

hive.map.aggr=true;

        开启map端combiner。此配置可以在group by语句中提高HiveQL聚合的执行性能。这个设置可以将顶层的聚合操作放在Map阶段执行,从而减轻数据传输和Reduce阶段的执行时间,提升总体性能。默认开启,无需显示声明。

6.2.3.1 两个MRJob

hive.groupby.skewindata=true; --默认关闭。

        这个配置项是用于决定group by操作是否支持倾斜数据的负载均衡处理。当数据出现倾斜时,如果该变量设置为true,那么Hive会自动进行负载均衡。
        当选项设定为 true,生成的查询计划会有两个 MR Job。
        第一个MR Job中,Map 的输出结果集合会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的Group By Key有可能被分发到不同的Reduce中,从而达到负载均衡的目的;
        第二个MR Job再根据预处理的数据结果按照Group By Key分布到Reduce中(这个过程可以保证相同的Group By Key被分布到同一个Reduce中),最后完成最终的聚合操作。

        注意:在多个列上进行的去重操作与hive环境变量hive.groupby.skewindata存在冲突。
当hive.groupby.skewindata=true时,hive不支持多列上的去重操作,并报错:

Error in semantic analysis: DISTINCT on different columns notsupported with skew in data.

         比如:

-- 1、2、3能够正常执行,但是4会报错。(1) SELECt count(DISTINCT uid) FROM log(2) SELECt ip, count(DISTINCT uid) FROM log GROUP BY ip(3) SELECt ip, count(DISTINCT uid, uname) FROMlog GROUP BY ip(4) SELECt ip, count(DISTINCTuid), count(DISTINCT uname) FROMlog GROUP BY ip


7 Hive优化器 7.1 用处

        用来统计表或分区的行数,以及列的统计信息。主要用于查询优化。基于代价的优化(CBO)引擎可以利用优化器产生的统计数据来快速获取某些查询结果,从而提升统计的性能。比如统计用户的年龄分布占比,或者APP使用量TopN、以及会话信息的count(distinct())数量等,都能利用这种特性进行加速。

7.2 语法

表与分区的状态信息统计:

ANALYZE TABLE tablename[PARTITION(partcol1[=val1], partcol2[=val2], ...)]COMPUTE STATISTICS [noscan];

列信息统计:

ANALYZE TABLE tablename[PARTITION(partcol1[=val1], partcol2[=val2], ...)]COMPUTE STATISTICS FOR COLUMNS ( columns name1, columns name2...) [noscan];

当表存在分区时,需要在命令中指定,否则会报错;不支持使用列与表的别名。
noscan,不会扫描file,因此速度较快,但只能得到文件数numfiles和HDFS存储总大小totalSize;否则,还会包含行数numRows和原始数据大小rawDataSize(未压缩)。

查看表的统计信息:

desc formatted [table_name] [column_name];

删除表的统计信息:

ANALYZE TABLE [table_name] delete STATISTICS;

例子:
如果table1有4分分区:
• Partition1: (ds='2013-03-08', hr=11)
• Partition2: (ds='2013-03-08', hr=12)
• Partition3: (ds='2013-03-09', hr=11)
• Partition4: (ds='2013-03-09', hr=12)

如果只搜集最后一个增量分区的信息:

ANALYZE TABLE Table1 PARTITION(ds='2013-03-09', hr=12) COMPUTE STATISTICS;

同时搜集第三和第四个分区信息:

ANALYZE TABLE Table1 PARTITION(ds='2013-03-09', hr) COMPUTE STATISTICS;

搜集表的所有分区信息:

ANALYZE TABLE Table1 PARTITION(ds, hr) COMPUTE STATISTICS;

针对未分区的表进行信息搜集:

ANALYZE TABLE Table1 COMPUTE STATISTICS;

        在后续涉及到count、distinct等统计计算时,可以采用预先优化的方式来进行。
        因为分析优化器自身过程也耗时较多,所以在使用时要考虑它的使用率,如果在下一次表数据更新之前,优化器只被使用了一次,此时优化器反而降低了整体的执行效率。工作中我们要根据实际情况,避免过度优化。
        注意:hive.stats.autogather控制是否自动统计(noscan),默认开启,无需修改。

7.3 使用

Analyze优化器要结合CBO引擎来使用。

--计算出统计信息analyze table 表名 compute statistics;analyze table 表名 compute statistics for columns;--之后就设置以下属性启用CBOset hive.cbo.enable=true;--统计进行使用元数据的信息set hive.compute.query.using.stats=true;set hive.stats.fetch.column.stats=true;set hive.stats.fetch.partition.stats=true;--经过以上设置,现在下面每个查询都可使用基于代价的优化引擎select 语句...

CBO引擎可以自动优化,选择合适的Join算法。
使用Analyze+CBO可以最大化的优化执行计划。

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。