sparksql优化:⼩表⼤表关联优化union替换orbroadcastjoin ----原语句(运⾏18min)
SELECT
bb.ip
FROM
(
SELECT
ip ,
sum(click) click_num,
round(sum(click) / sum(imp), 4) user_click_rate
FROM
schema.srctable1
WHERE
date = '20171020'
AND ip IS NOT NULL
AND imp > 0
GROUP BY ip
) bb
LEFT OUTER JOIN
(
SELECT
round(sum(click) / sum(imp), 4) avg_click_rate
FROM
schema.srctable1
WHERE
date = '20171020'
) aa
LEFT OUTER JOIN schema.dstable cc
on cc.ip = bb.ip
WHERE  cc.ip is null
AND
(
bb.user_click_rate > aa.avg_click_rate * 3
AND click_num      > 500
)
OR
(
click_num > 1000
)
分析:
1、aa表存放的就是⼀个指标数据,1条记录,列为⼩表
2、bb表存放的是按ip聚合的明细数据,记录很多,列为⼤表
3、cc表⽤来过滤ip,数量也很⼩,列为过滤表,作⽤很⼩。
查看执⾏计划,发现bb与aa进⾏left outer join时,引发了shuffle过程,造成⼤量的磁盘及⽹络IO,影响性能。
解决策略
优化⽅案1:调整⼤⼩表位置,将⼩表放在左边后,提升⾄29s (该⽅案⼀直不太明⽩为啥会提升,执⾏计划⾥显⽰的也就是⼤⼩表位置调换下⽽已,跟之前的没其他区别)
优化⽅案2: 将 or 改成 union,提升⾄35s(各种调整,⼀直怀疑跟or有关系,后⾯调整成union其他不变,果真效率不⼀样;但⽅案1只是调整了下⼤⼩表顺序,并未调整其他,其效率同样提升很⼤;不太明⽩sparksql内部到底⾛了什么优化机制,后⾯继续研究);
优化⽅案3: 采⽤cache+broadcast⽅式,提升⾄20s(该⽅案将⼩表缓存⾄内存,进⾏map侧关联)
⽅案具体实施
----⽅案2:or 改成 union(运⾏35s)
SELECT bb.ip ip
FROM
(
SELECT
ip                  ,
sum(click) click_num,
round(sum(click) / sum(imp), 4)
user_click_rate
FROM
schema.srctable1
WHERE
date    = '20171020'
AND ip IS NOT NULL
AND imp > 0
GROUP BY  ip
)
bb
LEFT OUTER JOIN
(
SELECT round(sum(click) / sum(imp), 4) avg_click_rate
FROM schema.srctable1
WHERE date = '20171020'
)  aa
WHERE  ( bb.user_click_rate > aa.avg_click_rate * 3
AND click_num > 20 )
union
SELECT
bb.ip ip
FROM
(
SELECT
ip  , sum(click) click_num,
round(sum(click) / sum(imp), 4)  user_click_rate
FROM schema.srctable1
WHERE
date    = '20171020'
AND ip IS NOT NULL
AND imp > 0
GROUP BY  ip
)  bb
LEFT OUTER JOIN
(
SELECT
round(sum(click) / sum(imp), 4) avg_click_rate
FROM schema.srctable1
WHERE  date = '20171020'
)  aa
WHERE click_num > 40
) aa
LEFT OUTER JOIN schema.dstable cc
on  aa.ip = cc.ip
where cc.ip is null
-----cache+broadcast⽅式(20s)
原理:使⽤broadcast将会把⼩表分发到每台执⾏节点上,因此,关联操作都在本地完成,基本就取消了shuffle的过程,运⾏效率⼤幅度提⾼。
SELECT  round(sum(click) / sum(imp), 4) avg_click_rate
FROM schema.srctable1
WHERE date = '20171020';
INSERT into TABLE schema.dstable
SELECT  bb.ip
FROM  (
SELECT
ip  ,
sum(click) click_num,
round(sum(click) / sum(imp), 4)  user_click_rate
FROM schema.srctable1
WHERE
date    = '20171020'
AND ip IS NOT NULLsql语句优化方式
AND imp > 0
GROUP BY  ip
) bb
LEFT OUTER JOIN cta aa
LEFT OUTER JOIN schema.dstable cc
on cc.ip = bb.ip
WHERE cc.ip is null
AND (
bb.user_click_rate > aa.avg_click_rate * 3
AND click_num > 500
)
OR(
click_num > 1000
)
注意:
cache 表不⼀定会被⼴播到Executor,执⾏map side join,还受另外⼀个参数:spark.sql.autoBroadcastJoinThreshold影响,该参数判断是否将该表⼴播;
spark.sql.autoBroadcastJoinThreshold参数默认值是10M,所以只有cache的表⼩于10M的才被⼴播到Executor上去执⾏map side join。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。