leftjoin⼀对多只取⼀条_SparkSQL的3种Join实现
引⾔Join是SQL语句中的常⽤操作,良好的表结构能够将数据分散在不同的表中,使其符合某种范式,减少表冗余、更新容错等。⽽建⽴表和表之间关系的最佳⽅式就是Join操作。
对于Spark来说有3中Join的实现,每种Join对应着不同的应⽤场景:
Broadcast Hash Join :适合⼀张较⼩的表和⼀张⼤表进⾏join
Shuffle Hash Join : 适合⼀张⼩表和⼀张⼤表进⾏join,或者是两张⼩表之间的join
Sort Merge Join :适合两张较⼤的表之间进⾏join
前两者都基于的是Hash Join,只不过在hash join之前需要先shuffle还是先broadcast。下⾯将详细的解释⼀下这三种不同的join的具体原理。
Hash Join先来看看这样⼀条SQL语句:
select * from order,item where item.id = order.i_id
1. 确定Build Table以及Probe Table:这个概念⽐较重要,Build Table使⽤join key构建Hash Table,⽽Pr
obe Table使⽤join key进
⾏探测,探测成功就可以join在⼀起。通常情况下,⼩表会作为Build Table,⼤表作为Probe Table。此事例中item为Build
Table,order为Probe Table;很简单⼀个Join节点,参与join的两张表是item和order,join key分别是item.id以及order.i_id。现在假设这个Join采⽤的是hash join算法,整个过程会经历三步:
1. 构建Hash Table:依次读取Build Table(item)的数据,对于每⼀⾏数据根据join key(item.id)进⾏hash,hash到对应的
Bucket,⽣成hash table中的⼀条记录。数据缓存在内存中,如果内存放不下需要dump到外存;
1. 探测:再依次扫描Probe Table(order)的数据,使⽤相同的hash函数映射Hash Table中的记录,映射成功之后再检查join条件
(item.id = order.i_id),如果匹配成功就可以将两者join在⼀起。
基本流程可以参考上图,这⾥有两个⼩问题需要关注:
1. hash join性能如何?很显然,hash join基本都只扫描两表⼀次,可以认为o(a+b),较之最极端的笛卡尔集运算a*b,不知甩了多少条
街;
1. 为什么Build Table选择⼩表?道理很简单,因为构建的Hash Table最好能全部加载在内存,效率最⾼;这也决定了hash join算法只
适合⾄少⼀个⼩表的join场景,对于两个⼤表的join场景并不适⽤。
上⽂说过,hash join是传统数据库中的单机join算法,在分布式环境下需要经过⼀定的分布式改造,说到底就是尽可能利⽤分布式计算资源进⾏并⾏化计算,提⾼总体效率。hash join分布式改造⼀般有两种经典⽅案:
1. broadcast hash join:将其中⼀张⼩表⼴播分发到另⼀张⼤表所在的分区节点上,分别并发地与其上的分区记录进⾏hash join。
broadcast适⽤于⼩表很⼩,可以直接⼴播的场景;
1. shuffler hash join:⼀旦⼩表数据量较⼤,此时就不再适合进⾏⼴播分发。这种情况下,可以根据join key相同必然分区相同的原
理,将两张表分别按照join key进⾏重新组织分区,这样就可以将join分⽽治之,划分为很多⼩join,充分利⽤集资源并⾏化。
Broadcast Hash Join⼤家知道,在数据库的常见模型中(⽐如星型模型或者雪花模型),表⼀般分为两种:事实表和维度表。维度表⼀般指固定的、变动较少的表,例如联系⼈、物品种类等,⼀般数据有限。⽽事实表⼀般记录流⽔,⽐如销售清单等,通常随着时间的增长不断膨胀。
因为Join操作是对两个表中key值相同的记录进⾏连接,在SparkSQL中,对两个表做Join最直接的⽅式是先根据key分区,再在每个分区中把key值相同的记录拿出来做连接操作。但这样就不可避免地涉及到shuffle,⽽shuffle在Spark中是⽐较耗时的操作,我们应该尽可能的设计Spark应⽤使其避免⼤量的shuffle。
当维度表和事实表进⾏Join操作时,为了避免shuffle,我们可以将⼤⼩有限的维度表的全部数据分发到每个节点上,供事实表使⽤。executor存储维度表的全部数据,⼀定程度上牺牲了空间,换取shuffle操作⼤量的耗时,这在SparkSQL中称作Broadcast Join,如下图所⽰:
Table B是较⼩的表,⿊⾊表⽰将其⼴播到每个executor节点上,Table A的每个partition会通过block manager取到Table A的数据。根据每条记录的Join Key取到Table B中相对应的记录,根据Join Type进⾏操作。这个过程⽐较简单,不做赘述。
Broadcast Join的条件有以下⼏个:
1. 被⼴播的表需要⼩于spark.sql.autoBroadcastJoinThreshold所配置的值,默认是10M (或者加了broadcast join的hint)
1. 基表不能被⼴播,⽐如left outer join时,只能⼴播右表
看起来⼴播是⼀个⽐较理想的⽅案,但它有没有缺点呢?也很明显。这个⽅案只能⽤于⼴播较⼩的表,否则数据的冗余传输就远⼤于shuffle的开销;另外,⼴播时需要将被⼴播的表现collect到driver端,当频繁有⼴播出现时,对driver的内存也是⼀个考验。
如下图所⽰,broadcast hash join可以分为两步:
1. broadcast阶段:将⼩表⼴播分发到⼤表所在的所有主机。⼴播算法可以有很多,最简单的是先发给driver,driver再统⼀分发给所有
executor;要不就是基于bittorrete的p2p思路;
1. hash join阶段:在每个executor上执⾏单机版hash join,⼩表映射,⼤表试探;
SparkSQL规定broadcast hash join执⾏的基本条件为被⼴播⼩表必须⼩于参数spark.sql.autoBroadcastJoinThreshold,默认为
10M。sql left join 多表连接
Shuffle Hash Join当⼀侧的表⽐较⼩时,我们选择将其⼴播出去以避免shuffle,提⾼性能。但因为被
⼴播的表⾸先被collect到driver 段,然后被冗余分发到每个executor上,所以当表⽐较⼤时,采⽤broadcast join会对driver端和executor端造成较⼤的压⼒。
但由于Spark是⼀个分布式的计算引擎,可以通过分区的形式将⼤批量的数据划分成n份较⼩的数据集进⾏并⾏计算。这种思想应⽤到Join 上便是Shuffle Hash Join了。利⽤key相同必然分区相同的这个原理,两个表中,key相同的⾏都会被shuffle到同⼀个分区
中,SparkSQL将较⼤表的join分⽽治之,先将表划分成n个分区,再对两个表中相对应分区的数据分别进⾏Hash Join,这样即在⼀定程度上减少了driver⼴播⼀侧表的压⼒,也减少了executor端取整张被⼴播表的内存消耗。其原理如下图:
Shuffle Hash Join分为两步:
1. 对两张表分别按照join keys进⾏重分区,即shuffle,⽬的是为了让有相同join keys值的记录分到对应的分区中
1. 对对应分区中的数据进⾏join,此处先将⼩表分区构造为⼀张hash表,然后根据⼤表分区中记录的join keys值拿出来进⾏匹配Shuffle Hash Join的条件有以下⼏个:
1. 分区的平均⼤⼩不超过spark.sql.autoBroadcastJoinThreshold所配置的值,默认是10M
1. 基表不能被⼴播,⽐如left outer join时,只能⼴播右表
1. ⼀侧的表要明显⼩于另外⼀侧,⼩的⼀侧将被⼴播(明显⼩于的定义为3倍⼩,此处为经验值)
我们可以看到,在⼀定⼤⼩的表中,SparkSQL从时空结合的⾓度来看,将两个表进⾏重新分区,并且对⼩表中的分区进⾏hash化,从⽽完成join。在保持⼀定复杂度的基础上,尽量减少driver和executor的内存压⼒,提升了计算时的稳定性。
在⼤数据条件下如果⼀张表很⼩,执⾏join操作最优的选择⽆疑是broadcast hash join,效率最⾼。但是⼀旦⼩表数据量增⼤,⼴播所需内存、带宽等资源必然就会太⼤,broadcast hash join就不再是最优⽅案。此时可以按照join key进⾏分区,根据key相同必然分区相同的原理,就可以将⼤表join分⽽治之,划分为很多⼩表的join,充分利⽤集资源并⾏化。如下图所⽰,shuffle hash join也可以分为两步:
1. shuffle阶段:分别将两个表按照join key进⾏分区,将相同join key的记录重分布到同⼀节点,两张表的数据会被重分布到集中所
有节点。这个过程称为shuffle
1. hash join阶段:每个分区节点上的数据单独执⾏单机hash join算法。
看到这⾥,可以初步总结出来如果两张⼩表join可以直接使⽤单机版hash join;如果⼀张⼤表join⼀张极⼩表,可以选择broadcast hash join算法;⽽如果是⼀张⼤表join⼀张⼩表,则可以选择shuffle hash join算法;那如果是两张⼤表进⾏join呢?
Sort Merge Join上⾯介绍的两种实现对于⼀定⼤⼩的表⽐较适⽤,但当两个表都⾮常⼤时,显然⽆论
适⽤哪种都会对计算内存造成很⼤压⼒。这是因为join时两者采取的都是hash join,是将⼀侧的数据完全加载到内存中,使⽤hash code取join keys值相等的记录进⾏连接。
当两个表都⾮常⼤时,SparkSQL采⽤了⼀种全新的⽅案来对表进⾏Join,即Sort Merge Join。这种实现⽅式不⽤将⼀侧数据全部加载后再进星hash join,但需要在join前将数据排序,如下图所⽰:
可以看到,⾸先将两张表按照join keys进⾏了重新shuffle,保证join keys值相同的记录会被分在相应的分区。分区后对每个分区内的数据进⾏排序,排序后再对相应的分区内的记录进⾏连接,如下图⽰:
看着很眼熟吧?也很简单,因为两个序列都是有序的,从头遍历,碰到key相同的就输出;如果不同,左边⼩就继续取左边,反之取右边。
可以看出,⽆论分区有多⼤,Sort Merge Join都不⽤把某⼀侧的数据全部加载到内存中,⽽是即⽤即取即丢,从⽽⼤⼤提升了⼤数据量下sql join的稳定性。
SparkSQL对两张⼤表join采⽤了全新的算法-sort-merge join,如下图所⽰,整个过程分为三个步骤:
1. shuffle阶段:将两张⼤表根据join key进⾏重新分区,两张表数据会分布到整个集,以便分布式并⾏处理;
1. sort阶段:对单个分区节点的两表数据,分别进⾏排序;
1. merge阶段:对排好序的两张分区表数据执⾏join操作。join操作很简单,分别遍历两个有序序列,碰到相同join key就merge输出,
否则取更⼩⼀边,见下图⽰意:
经过上⽂的分析,可以明确每种Join算法都有⾃⼰的适⽤场景,数据仓库设计时最好避免⼤表与⼤表的join查询,SparkSQL也可以根据内存资源、带宽资源适量将参数spark.sql.autoBroadcastJoinThreshold调⼤,让更多join实际执⾏为broadcast hash join。
声明:本号所有⽂章除特殊注明,都为原创,读者拥有优先阅读权,未经作者本⼈允许不得转载,否则追究侵权责任。
关注我的,后台回复【JAVAPDF】获取200页⾯试题!5万⼈关注的⼤数据成神之路,不来了解⼀下吗?5万⼈关注的⼤数据成关注我的,后台回复【JAVAPDF】获取200页⾯试题!
神之路,真的不来了解⼀下吗?5万⼈关注的⼤数据成神之路,确定真的不来了解⼀下吗?
欢迎您关注《⼤数据成神之路》
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论