SparkSQL源码剖析(⼀)SQL解析框架Catalyst流程概述
Spark SQL模块,主要就是处理跟SQL解析相关的⼀些内容,说得更通俗点就是怎么把⼀个SQL语句解析成Dataframe或者说RDD的任务。以Spark 2.4.3为例,Spark SQL这个⼤模块分为三个⼦模块,如下图所⽰
其中Catalyst可以说是Spark内部专门⽤来解析SQL的⼀个框架,在Hive中类似的框架是Calcite(将SQL解析成MapReduce任务)。Catalyst将SQL 解析任务分成好⼏个阶段,这个在对应的论⽂中讲述得⽐较清楚,本系列很多内容也会参考论⽂,有兴趣阅读原论⽂的可以到这⾥看:。
⽽Core模块其实就是Spark SQL主要解析的流程,当然这个过程中会去调⽤Catalyst的⼀些内容。这模块⾥⾯⽐较常⽤的类包括SparkSession,DataSet等。
⾄于hive模块,这个不⽤说,肯定跟hive有关的。这个模块在本系列基本不会涉及到,就不多介绍了。
值得⼀提的是,论⽂发表的时候还是在Spark1.x阶段,那个时候SQL解析成词法树⽤的是scala写的⼀个解析⼯具,到2.x阶段改为使⽤antlr4来做这部分⼯作(这应该算是最⼤的改变)。⾄于为什么要改,我猜是出于可读性和易⽤性⽅⾯的考虑,当然这个仅是个⼈猜测。
另外,这⼀系列会简单介绍⼀条SQL语句的处理流程,基于spark 2.4.3(sql这个模块在spark2.1后变化不⼤)。这⼀篇先从整体介绍Spark SQL出现的背景及解决问题,Dataframe API以及Catalyst的流程⼤概是怎么样,后⾯分阶段细说Catalyst的流程。
Spark SQL出现的背景及解决的问题
在最早的时候,⼤规模处理数据的技术是MapReduce,但这种框架执⾏效率太慢,进⾏⼀些关系型处理(如join)需要编写⼤量代码。后来hive这种框架可以让⽤户输⼊sql语句,⾃动进⾏优化并执⾏。
但在⼤型系统中,任然有两个主要问题,⼀个是ETL操作需要对接多个数据源。另⼀个是⽤户需要执⾏复杂分析,⽐如机器学习和图计算等。但传统的关系型处理系统中较难实现。
Spark SQL提供了两个⼦模块来解决这个问题,DataFrame API和Catalyst。
相⽐于RDD,Dataframe api提供了更加丰富的关系型api,并且能和RDD相互转换,后⾯Spark机器学习⽅⾯的⼯作重⼼,也从以RDD为基础的mllib 转移到以Dataframe为基础的Spark ML(虽然Dataframe底层也是RDD)。
另⼀个就是Catalyst,通过它可以轻松为诸如机器学习之类的域添加数据源(⽐如json或通过case class⾃定义的类型),优化规则和数据类型。
通过这两个模块,Spark SQL主要实现以下⽬标:
提供⽅便易⽤好的API,包括读取外部数据源,以及关系数据处理(⽤过的都知道)
使⽤已建⽴的DBMS技术提供⾼性能。
轻松⽀持新数据源,包括半结构化数据和外部数据库(⽐如MYSQL)。
图计算和机器学习⽅⾯的拓展
那下⾯就介绍Dataframe和Catalyst的流程,当然主要讨论的还是Catalyst。
统⼀API Dataframe
先来看看论⽂⾥⾯提供的⼀张图:
这张图可以说明很多,⾸先Spark的Dataframe API底层也是基于Spark的RDD。但与RDD不同的在于,Dataframe会持有schema(这个实在不好翻译,可以理解为数据的结构吧),以及可以执⾏各种各样的关系型操作,⽐如Select,Filter,Join,Groupby等。从操作上来说,和pandas的Dataframe有点像(连名字都是⼀样的)。
同时因为是基于RDD的,所以很多RDD的特性Dataframe都能够享受到,⽐如说分布式计算中⼀致性,可靠性⽅⾯的保证,以及可以通过cache缓存数据,提⾼计算性能啊等等。
同时图中页展⽰了Dataframe可以通过JDBC链接外部数据库,通过控制台操作(spark-shell),或者⽤户程序。说⽩了,就是Dataframe可以通过RDD转换⽽来,也可以通过外部数据表⽣成。
对了,这⾥顺便说⼀句,很多初次接触Spark SQL的童鞋可能会对Dataset和Dataframe这两个东西感到疑惑,在1.x时代它们确实有些差别,不过在spark2.x的时候,这两个API已经统⼀了。所以基本上Dataset和Dataframe可以看成是等价的东西。
最后还是结合代码做⼀下实际的展⽰吧,如下展⽰⽣成⼀个RDD,并且根据这个RDD⽣成对应的Dataframe,从中可以看出RDD和Dataframe的区别:
/
/⽣成RDD
scala> val data = sc.parallelize(Array((1,2),(3,4)))
data: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> data.foreach(println)
(1,2)
(3,4)
scala> val df = DF("fir","sec")
df: org.apache.spark.sql.DataFrame = [fir: int, sec: int]
scala> df.show()
+---+---+
|fir|sec|
+---+---+
|  1|  2|
|  3|  4|
scala不是内部或外部命令
+---+---+
//跟RDD相⽐,多了schema
scala> df.printSchema()
root
|-- fir: integer (nullable = false)
|-- sec: integer (nullable = false)
Catalyst流程解析
Catalyst在论⽂中被叫做优化器(Optimizer),这部分是论⽂⾥⾯较为核⼼的内容,不过其实流程还是蛮好理解的,依旧贴下论⽂⾥⾯的图。
主要流程⼤概可以分为以下⼏步:
1. Sql语句经过Antlr4解析,⽣成Unresolved Logical Plan(有使⽤过Antlr4的童鞋肯定对这⼀过程不陌⽣)
2. analyzer与catalog进⾏绑定(catlog存储元数据),⽣成Logical Plan;
3. optimizer对Logical Plan优化,⽣成Optimized LogicalPlan;
4. SparkPlan将Optimized LogicalPlan转换成 Physical Plan;
5. prepareForExecution()将 Physical Plan 转换成 executed Physical Plan;
6. execute()执⾏可执⾏物理计划,得到RDD;
提前说⼀下吧,上述流程多数是在org.apache.ution.QueryExecution这个类⾥⾯,这个贴⼀下简单的代码,看看就好,先不多做深究。后⾯的⽂章会详细介绍这⾥的内容。
class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
......其他代码
//analyzer阶段
lazy val analyzed: LogicalPlan = {
SparkSession.setActiveSession(sparkSession)
sparkSession.uteAndCheck(logical)
}
//optimizer阶段
lazy val optimizedPlan: LogicalPlan = sparkSession.ute(withCachedData)
//SparkPlan阶段
lazy val sparkPlan: SparkPlan = {
SparkSession.setActiveSession(sparkSession)
// TODO: We use next(), i.e. take the first plan returned by the planner, here for now,
//      but we will implement to choose the best plan.
planner.plan(ReturnAnswer(optimizedPlan)).next()
}
//prepareForExecution阶段
// executedPlan should not be used to initialize any SparkPlan. It should be
// only used for execution.
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
/
/execute阶段
/** Internal version of the RDD. Avoids copies and has no schema */
lazy val toRdd: RDD[InternalRow] = ute()
......其他代码
}
值得⼀提的是每个阶段都使⽤了lazy懒加载,对这块感兴趣可以看看我之前的⽂章。
上述主要介绍Spark SQL模块内容,其出现的背景以及主要解决问题。⽽后简单介绍下Dataframe API的内容,以及Spark SQL解析SQL的内部框架Catalyst。后续主要会介绍Catalyst中各个步骤的流程,结合源码来做⼀些分析。
以上~

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。