HiveSQL常⽤优化⽅法经验总结
1. 写在前⾯的话
此处省略150字…
2. Hive中解决数据倾斜的场景
2.1 ⼤表Join⼩表时的数据倾斜(map join)
  在⼤表Join⼩表时,解决数据倾斜最好的⽅式是使⽤Map Join,避免Shuffle,从⽽也避免了数据倾斜. map join主要通过下⾯的参数来调节:
#默认是true
set vert.join=true  --开启map join  //1.x版本及以后默认是开启的
#设置⼩表的⼤⼩,⽣产环境可以适当调⼤
set hive.mapjoin.smalltable.filesize=25000000 --默认⼩表⼩于25mb
  在map join中,还有如下2个参数经常使⽤:
set ditionaltask=true;
set ditionaltask.size=60000000;
说明:
1. 第⼀个参数的作⽤是,在内连接中,除了第⼀个表之外的其它表是⼩表的情况,将普通的join转化为Map join时,是否将多个Map join合并
为⼀个Map Join.设为true表⽰合并.
2. 第⼆个参数是限定表的⼤⼩,多个Map join合并成⼀个Map join时,其⼩表的总⼤⼩必须⼩于该值才合并.
3. 在⼩表left join⼤表时,不能将⼩表写⼊内存,否则得到错误的结果.inner join,⼩表在左边或右边都可以.full out join不能使⽤map join.
2.2 ⼤表Join⼤表时的数据倾斜
  上⾯的Map Join解决了⼩表关联的问题,假如关联的是2个⼤表就不太适⽤了.⼩表不⼩,所以不太适
合加载进内存,那么Map Join肯定是不适合的;两个表都不⼩,所以必然带来⼤量的⽹络IO和磁盘IO,所以Reduce Join也不适合.这个时候就可以使⽤SMB Join,并带来性能很⼤的提升.
  使⽤SMB Join的前提: 1.创建表时必须是创建分桶表;2.创建表时必须指定sort by排序;3.分桶的字段与sort by排序的字段且和两表关联的字段必须是⼀样的.
force.bucketing=true; --默认为false,开启后,则写⼊数据时会⾃动分桶
force.sorting=true; --默认为false,开启后,插数据到表中会进⾏强制排序
#如果希望SMB Join能够转换为SMB Map Join,还需要设置以下参数
set vert.sortmerge.join=true;
set hive.optimize.bucketmapjoin =true;
set hive.optimize.bucketmapjoin.sortedmerge =true
原理说明:
  在map join的时候,两个表是以相同的⽅式来划分桶的,则处理左边表的某个桶的时候,Mapper是知
道表内对应的记录也在右边表的相同的桶内.因此Mapper只需要获取对应的那个桶,然后进⾏连接就可以.桶中的数据是根据相同的字段进⾏排序,这样每个桶的join就变成了merge sort,可以进⼀步提升map join效率.
2.3 group by引起的数据倾斜
2.3.1 key的基数少(即key的值种类不多),但数据量很⼤引起的数据倾斜
#默认为fasle,这样就会开始map端聚合combiner.可以减少shuffle的数据量.
set hive.map.aggr=true;
2.3.2 个别key的值数据量⼤,造成这类key的reduce执⾏太久引起的数据倾斜
#默认为fasle,应设置为true
upby.skewindata=true;
#设置group by后⾯的键的记录数超过该值就会优化
upby.mapaggr.checkinterval=100000;
#预先取100000条数据聚合,如果聚合后的条数/1000000 > 0.5,则不再预聚合
sql语句优化方式set hive.map.aggr.duction=0.5;
其原理是,在group by时启动2个MR job,第⼀个job会将map端数据随机分发到各个reducer,每个reducer做部分聚合,相同key的所有数据就会分布在不同的reducer中.第⼆个job再将前⾯预处理过程过的数据按key聚合并输出结果,这样就启到了均衡的效果.
2.3.3 count(distinct)引起的数据倾斜
  当我们需要对某个字段数据进⾏去重统计时,如果数据量很⼤,count(distinct)就会⾮常慢,为了保证全局去重,count(distinct)只会有⼀个reduce来处理.这时可以⽤group by来改写.但是这样会启动两个MR job(单纯的distinct只会启动⼀个).所以如果数据量不是太⼤的话,使⽤count(distinct)或许更快.因为这时还使⽤group by的话,需要启动2个MR,就会耗费更多的时间.
2.3.4 两表Join时引起的数据倾斜
2.3.4.1 关联字段空值过多的情况
实际场景:
  在⽇志中,常会有信息丢失的问题,⽐如⽇志中的user_id,如果取其中的user_id和⽤户表中的user_id关联,会碰到数据倾斜的问题.因为这些空值在join时会被分配到⼀个reduce,拖累进度.(空值字段是连接字段的情况)
解决办法:
  如果user_id为null的数据不需要,可以直接过滤掉,不参与关联.
select
user_id
from
(
select user_id from log where log.user_id is not null
)t
join users on t.user_id=users.user_id
  如果user_id为null的需要保留可以使⽤下⾯2种⽅式解决数据倾斜:
#先单独取出user_id为null的,
select
user_id
from
(
select user_id from log where log.user_id is not null
)t
join users on t.user_id=users.user_id
union all
select * from log where user_id is null
#在进⾏join时,将null值⽤随机值替代然后进⾏join
select *
from log left
join users
on case when log.user_id is null then cancat("hive",rand())else log.user_id = users.user_id
说明:
  1. 在给定随机值的时候注意产⽣的随机值不要与另⼀张表有关联上的可能,⼀定要保证产⽣的随机值是关联不上的,因为空值本来就是关联不上的,不要改变了关联了结果.变为随机值也不要影响最终结果.
  2. 把空值的key变成⼀个字符串加上随机数,就能把倾斜的数据分到不同的reduce上,解决数据倾斜问题.⽽这些分散到各个reduce上的数据由于是随机值,不会产⽣关联,不会影响最后关联的结果.
2.3.4.2 关联字段空值过多的情况
  ⽤户表中user_id字段类型为int , log表中user_id的字段类型为string.当两表进⾏关联,user_id作为关联字段时,默认的hash操作会按照int类型的user_id来进⾏分配,这样会导致所有string类型的user_id记录都会分配到同⼀个Reducer中,造成严重的数据倾斜.
解决办法: 把数字类型转换成字符串类型.
on user.user_id=cast(log.user_id as string)
3. Hive中解决⼩⽂件的问题
3.1 ⼩⽂件产⽣的原因
1. 对于MR任务,reducer的任务数过多,会有⼤量的⼩⽂件输出.
3.2 ⼩⽂件带来的问题
1. 如果有⼤量的⼩⽂件,就会产⽣⼤量的元数据信息,那么就会占⽤namenode⼤量的内存空间,影响namenode的性能.
2. ⼤量的⼩⽂件就会产⽣⼤量的map个数,调度时间较长,影响执⾏效率.
3.3 解决⼩⽂件问题
3.3.1 通过JVM重⽤解决⼩⽂件产⽣的⼤量map数
  在MR job中,默认是每执⾏⼀个task就启动⼀个JVM.如果task⾮常⼩⽽碎,那么JVM启动和关闭的耗时就会很长.可以通过调节参数use.jvm.num.tasks的值来达到重⽤的⽬的(默认值为1).例如将这个参数设成5,那么就代表同⼀个MR job中顺序执⾏的5个task可以重复使⽤⼀个JVM,减少启动和关闭的开销.但它对不同MR job中的task⽆效.
  如果⼤量的⼩⽂件启动⼤量的map,可以通过开启JVM重⽤来解决.特别适合很难避免⼩⽂件或task特别多的场景,这类场景⼤多执⾏时间都很短.hadoop默认配置是使⽤派⽣JVM来执⾏map和reduce任务的,这时jvm的启动过程可能会造成相当⼤的开销,尤其是执⾏的job包含⼤量的task任务的情况时.
JVM重⽤可以使得JVM实例在同⼀个job中重新使⽤N次,N的值在Hive中可以通过set use.jvm.num.tasks参数来设置.
  JVM重⽤的缺点是⼀直占着资源,以便进⾏重⽤.所以如果有⼏个task迟迟执⾏不成功,那么被占着的资源迟迟得不到释放,那么被占着的插槽⼀直空闲确⽆法被其它job使⽤,直到所有task都结束才会释放.
3.3.1 在Mapper输⼊阶段对⼩⽂件进⾏合并
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat
设置上⾯的参数之后就会⾃动合并⼩⽂件,具体合并的⼤⼩由下⾯的参数决定:
#最⼤的split⼤⼩.⼀个split就对应1个map. 默认为256MB,可以调⼤
mapred.max.split.size=256000000; --决定每个map处理的最⼤的⽂件⼤⼩,单位B
#单个节点最⼩的split的⼤⼩.默认为1Byte,需要调⼤.那么⼩于这个数的⽂件就会合并
mapred.min.split.de=1; --⼀个节点上split的⾄少的⼤⼩
#单个机架最⼩的split的⼤⼩
mapred.min.split.size.per.rack=1;  --⼀个机架中split的⾄少的⼤⼩
3.3.2 在MR的输出阶段进⾏合并
#在只有map阶段时进⾏合并
mapfiles=true
#MR引擎适⽤,在reduce阶段合并
mapredfiles=true
#tez引擎适⽤
tezfiles=true
#合并后期望的⽂件⼤⼩
size.per.task=256000000(默认256MB)
#当输出⽂件⼤⼩的平均值⼤⼩⼩于该值时,启动⼀个独⽴的mapReduce任务进⾏⽂件merge
smallfiles.avgsize=16000000(默认16MB)
4. 适当开启压缩
  当数据⽐较⼤的时候,可以考虑压缩数据.Snappy压缩率低,但压缩,解压速度最快.
  压缩Job的中间结果数据和输出数据.具体的压缩⽅式根据场景来定,如果需要考虑分⽚,可以使⽤lzo压缩,为压缩⽂件建⽴索引之后,就可以进⾏分⽚了.如果不考虑分⽚,可以使⽤snappy压缩,因为它的压缩速度是最快的.
  开启map和reduce中间过程的压缩,减少map和ruduce task之间的数据传输量.
press.intermediate=true; --默认为false
--设置解码格式
set mapred.dec=org.apache.hadoop.iopress.SnappyCodec;
  Hive的最终输出结果压缩:
press.output=true; --默认为false,开启最终压缩
set pe=BLOCK; --默认为record
--设置压缩⽅式,这⾥为snappy
set dec=org.apache.hadoop.iopress.SnappyCodec;
set mapreduce.output.fileoutputformatpress=true;
说明:
  通常,为了减轻存储和数据传输,提升效率,我们都会选择压缩,但压缩的不同格式有不同的适合场景,这个每个公司都有差异,需要根据实际情况来选择。
5. 开启并⾏执⾏
  开启并⾏执⾏,在同⼀个应⽤中,Hive中互相没有依赖关系的job间是可以并⾏执⾏的,最典型的就是多个⼦查询union all,在集资源相对充⾜的情况下,可以开启并⾏执⾏,即将参数parallel设为true(默认为false).另外parallel.thread.number可以设定并⾏执⾏的线程数,默认为8,⼀般都够⽤(即指定并⾏的最⼤job个数,默认为8个)。
  在有union all的场景下,特别适⽤,多个任务之间没有依赖,独⽴运⾏.在资源不是核⼼瓶颈的前提下,可以直接缩短运⾏时间.如果集只有数台机器,资源有限,开并⾏会导致资源紧张,这种⽅式就不⼀定能提到调休的效果了.
6. 开启本地执⾏
  Hive也可以不将任务提交到集进⾏运算,⽽是直接在⼀台节点上处理。因为消除了提交到集的消耗.所以⽐较适合数据量很⼩,且逻辑不复杂的任务。要启⽤本地模式,需要设置以下参数:
ode.local.auto=true; --默认为false,设为ture开启本地执⾏
ode.local.auto.inputbytes.max=134217728; --任务输⼊的总数据量必须⼩于设定的值,默认128MB
ode.local.auto.tasks.max=4; --且Mapper的数量必须⼩于设定的值.默认为4
reduce的数量必须0或1个
7. 开启严格模式
  所谓严格模式,就是强制不允许⽤户执⾏3种有风险的HiveSQL语句,⼀旦执⾏就会直接失败.设置de的值为strict可以开启严格模式.
严格模式开启后,会禁⽌以下3类查询:
1. 查询分区表时不限定分区列的语句在严格模式下执⾏失败
2. 两表join时产⽣了笛卡尔积的语句在严格模式下执⾏失败(没有连接条件或连接条件失效都会产⽣笛卡尔积)
3. ⽤order by来排序但没有指定limit的语句在严格模式下也会执⾏失败.
8. 选择合适的存储格式
  因为在Hive中很多场景下,我们并不需要查询每⾏数据中的所有字段,更多的时候是查询需要的某⼏个字段,这时可以考虑使⽤列式存储格式,⽐如parquet和orc格式.如果仅仅是使⽤HiveSQL查询,可以选⽤ORC格式,性能是最好的.如果考虑到多分析引擎的使⽤,⽐如
Spark,Impala等,可以考虑Parquet格式.
9. 调整Map的数量
Map数量是否是越多越好?
  如果⼀个任务有很多⼩⽂件(<<128M),每个⼩⽂件也会被当做⼀个数据块,⽤⼀个mapTask来完成.每个mapTask启动和初始化时间>>处理时间,会造成资源浪费,⽽且系统中同时可⽤的map数量是有限的.所以对于⼩⽂件采⽤的策略是合并.
每个Map处理接近128M的⽂件,就是最合适的么?
  有⼀个125M的⽂件,⼀般情况下⽤⼀个MapTask来完成.假设这个⽂件字段很少,但记录数却⾮常多,如果map函数处理的逻辑⽐较复杂,⽤⼀个mapTask去做的话,性能也不好.对于这种复杂⽂件采⽤的策略是增加Map数量.
可以通过下⾯的⽅式调节map的数量:
computeSplitSize(max(minSize,min(maxSize,blocksize)))
minSize : mapred.min.split.size(默认值为1)
maxSize : mapred.max.split.size(默认值256M)
如果都是默认参数那么计算出来就是按128MB作为⼀个分⽚.
也可调整maxSize的最⼤值可以改变map的数量.
1.如果让maxSize最⼤值⼩于blocksize就可以增加map的个数.假如maxSize的
值调为64,那么分⽚⼤⼩计算出来就为64
10. 调整reducer的数量
  reducer数量的确定⽅法⽐mappper简单的多,可以直接使⽤duce.tasks参数来设置.如果未设置该参数,Hive会⾃动推测Reducer的数量,推测逻辑如下:
1. 参数ducers.ducer⽤来设定每个reducer能够处理的最⼤数据量,默认值256MB.
2. 参数ducers.max⽤来设定每个job最多能启动的reducer数量,默认值999(1.2版本之前)或1009(1.2版本之后)
3. 计算出reducer数量=min(reduce的输⼊数据总量/256M,1009)
reducer的数量与输出⽂件的数量有关,如果reducer数太多,会产⽣⼤量⼩⽂件,对HDFS造成压⼒.如果redcuer数量太少,每个reducer 要处理很多数据,容易拖慢运⾏时间或者造成OOM.
说明:
也有些情况是固定只有⼀个reduce的(不管有没指定reduce数量):
a.没有group by的汇总
b.使⽤order by全局排序
c.笛卡尔积
但这⼏种情况⼀般是我们需要避免的,因为会造成性能瓶颈.
11.其它⽅⾯的优化
11.1 列裁剪和分区裁剪
  所谓列裁剪就是在查询时只读取需要的列.分区裁剪就是只读取需要的分区.创建分区表,避免全表扫描.
11.2 谓词下推
  简单理解,在进⾏join操作时,先使⽤where条件过滤出需要的⾏数据,再进⾏join,这样就能减少下游处理的数据量.⽽不是先全表join,再进⾏过滤.
12. 减少job数优化举例
  启动⼀次job尽可能的多做事情,⼀个job能完成的事情,不要两个job来做.可以尽量减少job的数量,即减少MR的数量,让⼀个job完成更多的功能.提升执⾏速度.
12.1 尽量使⽤union all替换union
  ⼀般在实际场景中不会使⽤union,因为union除了将⼏张表拼接在⼀起,还会去重,会增加job的数量.union all会单纯的将⼏张表拼接在⼀起,不会产⽣额外的job,如果关联在⼀起后真的要去重,也不使⽤union,⽽是⽤group by来去重.
12.2 有多张表要分别求出每张表的记录数优化的场景

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。