Spark原理-SparkSql框架优化策略
有了SparkCore为什么还要有SparkSql呢?
有两⼤原因:
⼀是SparkCore只能⽤Api,这就把很多SqlBoy拒之门外,Spark就⽆法发扬光⼤了;
⼆是使⽤Api时⽤户编写的函数作为⼀个个闭包被序列化后分发到Executor执⾏,Spark⽆法对⽤户⾃定义的代码进⾏优化;
基于以上原因,SparkSql横空出世,并提供强⼤的、⼀篮⼦的优化⽅案,以便使⽤户专注于业务需求的实现,把性能优化交给spark框架。SparkSql提供如下优化措施:
⼀,Catalyst 优化器
Catalyst 主要负责三个⼯作:
⼀是根据Sql⽣成语法树(执⾏计划);
⼆是执⾏计划的逻辑优化;
三是执⾏计划的物理优化
以如下Sql为例,来看看Catalyst 是如何⼯作的:
select
student_id,
count(1)
from
(
select
student_id,
age
from
score
where
age >10
)tmp
left join score
on
score.student_id = tmp.student_id
where score >60
group by
tmp.student_id
⾸先,⽣成逻辑语法树
逻辑语法树也可以称之为执⾏计划,如下:
这是Catalyst 解析出来的语法树,也可以认为是执⾏计划,从上往下并⾏执⾏。
按照这个计划,其实就可以去执⾏了,但是Catalyst 还会对其进⾏优化。
其次,执⾏计划的逻辑优化
从图上可以看出,有⼏个点是可以优化的:
⼀是对score > 60的过滤时机可以提前,这样的话,score < 60的数据就不会加载到内存中。
⼆是student和score表中可能有⼤量字段,但sql中只⽤到student_id、score、age这⼏个字段,其他字段⽆需加载到内存中。
以上分别对应着Catalyst 的两个优化⼿段:谓词下推、列裁剪。
像谓词下推、列剪枝这样的特性,都被称为启发式的规则或策略。⽽ Catalyst 优化器的核⼼职责之⼀,就是在逻辑优化阶段,基于启发式的规则和策略调整、优化执⾏计划。经过逻辑阶段的优化之后,执⾏计划如下:
逻辑优化是基于规则的优化,是静态的,Spark并没有到此为⽌,接下来会对执⾏计划进⾏动态的调整优化,动态调整的依据是运⾏中的数据统计。
第三,执⾏计划的物理优化
在执⾏过程中,Spark会根据数据集的⼤⼩进⾏计算策略的调整,以join为例,会根据数据集的⼤⼩选择Join的⽅式、数据分发的⽅式,⽐如有⼀个数据集⽐较⼩,会选择Broadcast⽅式将数据分发出去,节省⽹络分发的时间,提⾼性能。
⼆,Tungsten
继Catalyst 的优化之后,Tungsten ⼜出场了,其主要在数据结构和执⾏代码⽅⾯进⾏优化,主要的⽬的是为了更⾼效率的利⽤内存和CPU,如将空间利⽤率的java对象变为UnsafeRow;为了减少昂贵的⽅法调⽤,将⼀个Stage多个算⼦整合为⼀个函数;
1,Unsafe Row。
对于每条数据记录,Spark SQL默认采⽤Row 对象来进⾏封装和存储。Java对象是⼀种空间利⽤率不⾼的存储,⽐如与数据本⾝⽆关的对象头信息,为了补⾜长度的对齐部分,会产⽣占⽤相当可观的额外空间。
针对这个问题,Tungsten 设计并实现了⼀种叫做 Unsafe Row 的⼆进制数据结构。Unsafe Row是⼀种⼆进制数据结构,以⾮常紧凑的结构存储数据,如下所⽰:
Unsafe Row避免了⼤量的额外信息的存储,极⼤的提⾼了空间利⽤率,对于Spark这种重度内存依赖型计算引擎,有⾮常⼤的性能提升作⽤。sql优化的几种方式
通常我们在写代码的过程中,并不经常直接使⽤Unsafe Row,Spark计算产⽣的中间结果和输出会使⽤到。
参考:
2,全阶段代码⽣成(WSCG,Whole Stage Code Generation)
对于同⼀个Stage的多个算⼦,本质上是多个函数的链式调⽤,伴随着很多基本类型的装箱操作,Tungsten对这些代码进⾏分析,将多个函数融合为⼀个函数,将多次输⼊输出变为⼀次输⼊输出,减少了函数调⽤和参数封装。
三,AQE(Adaptive Query Execution)
AQE 的全称是 Adaptive Query Execution,⾃适应查询执⾏。
AQE主要是针对Shuffle进⾏的优化,包含了 3 个动态优化特性:
Join 策略调整
⾃动分区合并
⾃动倾斜处理
1,开启
AQE 机制默认是未开启的,要想充分利⽤上述的 3 个特性,通过如下配置开启:
spark.abled=true
2, Join 策略调整
在运⾏的过程中,AQE会动态跟踪数据的变化,如A/B两个表Join,如果这两个表都是⼤表,在⽣成执⾏计划时,只能选择Shuffle Join,但后续对B进⾏过滤或者聚合后,数据量⼤幅减少,AQE将会动态的将Shuffle Join调整为Broadcast Join。
这⾥看的出AQE的Join调整策略依赖于Shuffle的中间⽂件,因为其需要根据中间⽂件的⼤⼩去决策是
否调整Join⽅式。
3,⾃动分区合并
在Shuffle Write结束后,可能会产⽣很多数据量⾮常⼩的分区,并⾏度⾼但分区⼩会导致⼤量的调度,反⽽不利于作业的执⾏。
AQE会对⼩分区进⾏合并,多个⼩分区合并为⼀个⼤分区,减少Reduce阶段的并⾏度。
涉及到⾃动分区合并的有两个参数:
spark.sql.adaptive.advisoryPartitionSizeInBytes 默认64MB
spark.alescePartitions.minPartitionNum,最⼩分区数,默认spark集的默认并⾏度
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论