⼤表join⼤表_四⼗三、SparkSQL两种join详解:
BroadcastHashJo。。。
SparkSQL的3种Join实现原理
Join操作是在传统的关系型数据查询中最常⽤的操作,关系型数据库在设计表时考虑到表的3NF范式设
计,为了更好将数据分散在不同的表中以减少数据的冗余与减少更新容错等机制,⽽建⽴表之间关系的最好⽅式就是关联两张表,即所谓的表之间的join操作。
SparkSQL从Shark到SparkSQL,提供了SQL的实现, 完全兼容HQL语法,并且借鉴了关系型数据库的优化器原理,在分布式的环境下对SparkSQL的SQL引擎(catalyst)进⾏了极⼤的优化,其中,join作为SparkSQL中的常见操作,⾃然Spark对它进⾏了很⼤的优化。在SparkSQL中,对于join的实现主要有两种:BroadcastHashJoin和SortMergeJoin。BroadcastHashJoin是Spark对BroadcastJoin与Shufle Hash Join进⾏的优化,下⾯来分别介绍这三种join的原理:
⼀、Broadcast Join
sql left join 多表连接这种join⽅式来源于传统的基于维度的建模(Kinball), ⽐如说星型或雪花模型,表分为维度表和事实表。维表⼀般指相对来说⽐较固定的表,例如产品维表,客户维表等。⽽事实表是业务表,表的数据会随着业务和时间的增长⽽不断发⽣增删改操作,例如产品的销售表,订单表等。
传统关系型数据库中,对于两张表的等值join, 是对两张表中的相同的字段进⾏连接。在SparkSQL中,对两张表做join最直接的⽅式是先根据要join的key进⾏分区,再在每个分区中把key相同的记录拿出来做连接操作,因为是分布式的计算,就会不可避免地涉及到数据的shuffle, ⽽shufle在Spark中是耗时耗资源(IO与⽹络带宽)的操作,所以在设计时应避免⼤量的shuffle操作。
⽐如,在数据仓库中维表与事实有进⾏join操作时,为了避免shuffle, 可以将容量较⼩的维表全部数据分发到每个Executor中,供事实表使⽤。这就会在⼀定程度上牺牲了⼀定的Executor空间来换取shuffle可能带来的耗时操作,这就是SparkSQL中的Broadcast Join, 其主要原理如下图所⽰:
Broadcast Join
Broadcast Join需要满⾜以下⼏个条件
被⼴播的表需要⼩于spark.sql.autoBroadcastJoinThreshold所配置的值,默认是10M。或者在SQL语句中加⼊broadcast joinr的hint。
基表不能被⼴播出去,⽐如A表leftOuterJoin表B时,只能⼴播右边的表B;
Broadcast Join适⽤的场景及缺点
Broadcast Join只能⼴播较⼩的表,否则数据的冗余传输就远⼤于shuffle的开销;
只适合⼴播较⼩的表,需要将被⼴播的表collect到Driver端,会对Driver端与⽹络造成⼀定的开销;
⼆、Shuffle Hash Join
在上⾯的场景中,当两张表join操作时,选择将较⼩的表⼴播出去以避免shuffle,提⾼性能。正如上⾯所说,要⼴播的表⾸先要collect到Driver端,然后每个Executor都会分发⼀份, 所以当表⽐较⼤时,采⽤broadcast join会对Driver端和Executor端造成较⼤压⼒。
由于Spark是⼀个分布式的计算引擎,可以通过分区的⽅式将⼤批量的数据根据分区的个数划分成较⼩的数据集进⾏并⾏计算,利⽤相同的key必然被分到同⼀个区的原理,SparkSQL将⼤表之间的join操作分⽽治之,先将表按分区划分,再对两个表中相对应分区的数据分别进⾏Hash Join, 这样会在⼀定程度上减少Driver端的压⼒,也减少了Executor端读取整个⼴播表的内存消耗。它的原理图如下:
Shuffle Hash Join
Shuffle Hash Join的主要步骤
两张表分别按照连接的key按照HashPartitioner进⾏重新分区,即Shuffle操作,⽬的是让两张表相同的keys去往相同的分区中;
对分区中的数据进⾏join操作,此处先将⼩表分区构造成⼀张Hash表,然后根据⼤表分区中的记录的keys值拿出来进⾏匹配;
Shuffle Hash Join的条件
分区的平均⼤⼩不超过spark.sql.autoBroadcastJoinThreshold所配置的值,默认的值的⼤⼩是10M;
在⼀定⼤⼩的表中,SparkSQL结合空间与时间复杂度来对两张表进⾏重新分区,并且对较⼩表中的分区进⾏hash化,从⽽完成join操作。可以在⼀定复杂度的基础上,减少Driver端和Executor端的内存压⼒,提供了计算时的稳定性;
三、Sort Merge Join
上⾯介绍的两种join操作对于⼀定⼤⼩的表⽐较适⽤,但对于做连接操作的两个表都⾮常⼤时,⽆论选择上⾯哪种对会对计算的资源(内存与⽹络)造成很⼤的压⼒,这主要因为做join时两上⾯两种⽅法都是hash join, 是将⼀侧的数据完全加载到内存中,通过对要join操作的keys取hash code做等值join操作。
当两个表的都⾮常⼤时,SparkSQL采⽤了⼀种全新的⽅案对表进⾏join操作,即Sort Merge Join。这种实现⽅式不⽤将⼀边的表全部加载后再进⾏Hash Join, 但需要在join前将数据排序,如下图所⽰:
Sort Merge Join
从上图可以看出,两张⼤表都按照join的keys进⾏了重新shuffle, 保证keys值相同的记录会被分在相应的分区上。分区以后对每个分区内的数据进⾏排序,排序后再对相应的分区内的数据进⾏join操作。
四、SparkSQL的join机制
在SparkSQL中,前两种机制被合并成BroadcastHashJoin, 所以在SparkSQL⽣成的物理计划中有BroadcastHashJoin和SortMergeJoin两种机制。
五、代码试验
启动Spark的standlone集,启动spark-shell, 打开spark的WebUI
启动spark-shell
/
/导⼊隐式转换scala> import spark.implicits._import spark.implicits._//导⼊SparkSQL的function模块scala> import org.apache.spark.sql.functions._import org.apach
通过以上的代码我们可以发现,对于两张⼩表,SparkSQL默认的连接⽅式是BroadcastHashJoin, 具体的join操作我们可以从Spark的
WebUI上查看,如下图所⽰:
BroadcastHashJoin
如果在⽣成df1时对df1进⾏持久化,我们可以看看⽣成的物理计划中的"LocalTableScan"变成了“InMemoryTbleScan":
//对df1在join前执⾏cache操作scala> df1.cache().count()res2: Long = 5scala> val r1 = df1.join(df2, $"id" === $"sid")r1: org.apache.spark.sql.DataFrame = [id: int, n
查看webui, 可以看到还是BroadcastHashJoin
如果配置禁⽤BroadcastHashJoin, 我们看看会发⽣什么样的变化
//配置不使⽤BroadcastHashJoin, 在session级别配置scala> f.set("spark.sql.autoBroadcastJoinThreshold", -1)scala> val df1 = Seq( | (1, "章⽂"), 我们可以看出,如果设置了f.set("spark.sql.autoBroadcastJoinThreshold", -1), 即不做10M⼤⼩的限制,SparkSQL选择使
⽤了Sort Merget Join:
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论