hive中selectcount(distinctxx)fromtable查询慢原因及优化
⼀.问题描述
在hive中,如果数据量很⼤,select count(distinct xx) from table 有时会查询⾮常慢
举例说明:
表名:loan_trans
数据量:551353635
存储空间:52.93GB
select count(distinct id)
from loan_trans
where etl_tx_dt =20200202
number of mappers: 228; number of reducers: 1
执⾏开始时间-结束时间: 7:48-7:57
执⾏结果:3282768
⼆.分析原因
该语句转化为MapReduce作业后执⾏⽰意图如下
由于引⼊了DISTINCT,因此在Map阶段⽆法利⽤combine对输出结果消重,必须将id作为Key输出,在Reduce阶段再对来⾃于不同Map Task、相同Key的结果进⾏消重,计⼊最终统计值。
我们看到作业运⾏时的Reduce Task个数为1,对于统计⼤数据量时,这会导致最终Map的全部输出由单个的ReduceTask处理。这唯⼀的Reduce Task需要Shuffle⼤量的数据,并且进⾏排序聚合等处理,这使得它成为整个作业的IO和运算瓶颈。
经过上述分析后,我们尝试显式地增⼤Reduce Task个数来提⾼Reduce阶段的并发,使每⼀个Reduce Task的数据处理量控制在2G左右。具体设置如下:
duce.tasks=100
调整后我们发现这⼀参数并没有影响实际Reduce Task个数,Hive运⾏时输出“Number of reduce tasks determined at compile time: 1”。原来Hive在处理COUNT这种“全聚合(full aggregates)”计算时,它会忽略⽤户指定的Reduce Task数,⽽强制使⽤1。
三.优化
我们只能采⽤变通的⽅法来绕过这⼀限制。我们利⽤Hive对嵌套语句的⽀持,将原来⼀个MapReduce作业转换为两个作业,**在第⼀阶段选出全部的⾮重复id,在第⼆阶段再对这些已消重的id进⾏计数。这样在第⼀阶段我们可以通过增⼤Reduce的并发数,并发处理Map输出。在第⼆阶段,由于id已经消重,因此COUNT(*)操作在Map阶段不需要输出原id数据,只输出⼀个合并后的计数即可。这样即使第
⼆阶段Hive强制指定⼀个Reduce Task,极少量的Map输出数据也不会使单⼀的Reduce Task成为瓶颈。**改进后的SQL语句如下:
select count(*)
from
(
select
id
from loan_trans
where etl_tx_dt =20200202
group by id
) t
或者
select count(*)
from
(
select
distinct
id
from loan_trans
where etl_tx_dt =20200202
) t
⼆者的执⾏计划⼀致
执⾏计划如下:
STAGE DEPENDENCIES:
Stage-1is a root stage
Stage-2 depends on stages: Stage-1
Stage-0 depends on stages: Stage-2
""
STAGE PLANS:
distinct查询Stage: Stage-1
Map Reduce
Map Operator Tree:
TableScan
alias: loan_trans
filterExpr: (etl_tx_dt =20200202)(type: boolean)
Statistics: Num rows: 551353635Data size: 20400084495 Basic stats: COMPLETE Column stats: NONE Select Operator
expressions: id (type: bigint)
outputColumnNames: id
Statistics: Num rows: 551353635Data size: 20400084495 Basic stats: COMPLETE Column stats: NONE Group By Operator
keys: id (type: bigint)
mode: hash
outputColumnNames: _col0
Statistics: Num rows: 551353635Data size: 20400084495 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator
key expressions: _col0 (type: bigint)
sort order: +
Map-reduce partition columns: _col0 (type: bigint)
Statistics: Num rows: 551353635Data size: 20400084495 Basic stats: COMPLETE Column stats: NONE Reduce Operator Tree:
Group By Operator
keys: KEY._col0 (type: bigint)
mode: mergepartial
outputColumnNames: _col0
Statistics: Num rows: 275676817Data size: 10200042229 Basic stats: COMPLETE Column stats: NONE Select Operator
Statistics: Num rows: 275676817Data size: 10200042229 Basic stats: COMPLETE Column stats: N
ONE Group By Operator
aggregations: count()
mode: hash
outputColumnNames: _col0
Statistics: Num rows: 1Data size: 8 Basic stats: COMPLETE Column stats: NONE
Statistics: Num rows: 1Data size: 8 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
""
Stage: Stage-2
Map Reduce
Map Operator Tree:
TableScan
Reduce Output Operator
sort order:
Statistics: Num rows: 1Data size: 8 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: bigint)
Reduce Operator Tree:
Group By Operator
aggregations: count(VALUE._col0)
mode: mergepartial
outputColumnNames: _col0
Statistics: Num rows: 1Data size: 8 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 1Data size: 8 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
""
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink
""
在实际运⾏时,我们发现Hive还对这两阶段的作业做了额外的优化。它将第⼆个MapReduce作业Map中的Count过程移到了第⼀个作业的Reduce阶段。这样在第⼀阶Reduce就可以输出计数值,⽽不是消重的全部id。这⼀优化⼤幅地减少了第⼀个作业的Reduce输出IO以及第⼆个作业Map的输⼊数据量。最终在同样的运⾏环境下优化后的语句执⾏只需要原语句20%左右的时间。优化后的MapReduce作业流如下:
执⾏结果:
执⾏进度: INFO : Stage-Stage-1: Map: 228 Reduce: 847 Cumulative CPU: 13120.27 sec HDFS Read: 2831413787 HDFS Write: 98252 SUCCESS
执⾏进度: INFO : Stage-Stage-2: Map: 142 Reduce: 1 Cumulative CPU: 367.8 sec HDFS Read: 544869 HDFS Write: 8 SUCCESS
执⾏开始时间-结束时间10:12-10:14
结果:3282768
四.结论
第⼆次运⾏结果明显⽐第⼀次快了5倍,所以在需要去重计算时
可以使⽤ select count(*) from (select distinct id from tablename)替换
select count(distinct id) from tablename
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论