Spark设计理念和基本架构
Spark设计理念和基本架构
Spark是⼀个通⽤的并⾏计算框架,由加州伯克利⼤学(UC Berkeley) 的AMP实验室开发于2009年,并于2010年开源,2013年成长为Apache旗下在⼤数据领域最活跃的开源项⽬之⼀。
虽然Spark是⼀个通⽤的并⾏计算框架,但是Spark本质上也是⼀个基于map-reduce算法模型实现的分布式计算框架,Spark不仅拥有了Hadoop MapReduce的能⼒和优点,还解决了Hadoop MapReduce中的诸多性能缺陷。
HadoopMapReduce的问题与演进
早期的Hadoop MapReduce采⽤的是MRv1版本的MapReduce编程模型。MRv1具体实现可以参考org.apache.hadoop.mapred包。MRv1的Map和Reduce都是通过接⼝实现的。MRv1主要包括以下三个部分:
1)运⾏时环境(JobTracker和TaskTracker)
2)编程模型(MapReduce)
3)数据处理引擎(Map任务和Reduce任务)
MRv1将集管理功能和数据处理能⼒紧耦合在⼀起,如下图所⽰:
这种紧耦合的设计会导致以下问题:
1)可扩展性差:在运⾏时,JobTracker既负责资源管理,⼜负责任务调度,当集繁忙时,JobTracker很容易成为瓶颈,最终导致它的可扩展性问题。
2)可⽤性差:采⽤了单节点的Master,没有备⽤Master及选举操作,这导致⼀旦Master出现故障,整个集将不可⽤。
3)资源利⽤率低:TaskTracker使⽤slot等量划分本节点上的资源量。slot代表计算资源(CPU、内存等)。任务需要获取到slot后才能运⾏,Hadoop调度器负责将各个TaskTracker上的空闲slot分配给Task使⽤。即使⼀些Task不能充分利⽤slot所代表的资源,其他Task 也⽆法使⽤这些空闲的资源。在MRv1中,slot分为Map slot和Reduce slot两种,分别供MapTask和Reduce Task使⽤。有时会出现因为作业刚刚启动等原因导致MapTask很多,⽽Reduce Task任务还没有调度的情况,这时Reduce slot就会被闲置。
4)⽆法⽀持多种MapReduce框架:⽆法通过可插拔⽅式将⾃⾝的MapReduce框架替换为其他实现,
如Spark、Storm等。
Apache社区为了解决上述问题,对Hadoop MRv1进⾏改造,将集管理和数据处理进⾏解耦,演进出MRv2。在MRv2中,MRv1所包含的两⼤功能-集管理和数据处理被解耦。负责集管理的JobTracker和TaskTracker被重构为通⽤的资源管理器(资源调度平
台)ResourceManager(RM)、节点管理器NodeManager(NM)和负责各个计算框架的任务调度模型ApplicationMaster(AM)。在MRv2中,资源调度采⽤两级调度⽅案,ResourceManager负责整个集的资源管理,并将NodeManager汇报的空闲资源封装成container提供给ApplicationMaster完成第⼀级调度。⽽负责计算框架任务调度的ApplicationMaster则根据实际应⽤的具体情况进⾏第⼆级资源调度。⼆级调度的设计⼤⼤减少了ResourceManager的压⼒。NodeManager负责对单个节点的资源管理,并将资源信息、Container运⾏状态、健康状况等信息上报给ResourceManager。ResourceManager为了保证Container的利⽤率,会监控Container,如果Container未在有限的时间内使⽤,ResourceManager将命令NodeManager“杀死”Container,以便将资源分配给其他任务。经过将集资源管理和数据处理解耦后,MRv2的核⼼不再是MapReduce框架,⽽是YARN集管理器。因为在以YARN为核⼼的MRv2中,MapReduce框架是可插拔的,完全可以替换为其他MapReduce实现,⽐如Spark、Storm等。MRv2的⽰意图如下所⽰:
虽然Hadoop MRv2解决了MRv1中的⼀些问题,但是由于对HDFS的频繁操作(包括计算结果持久化、数据备份、资源下载及shuffle 等),导致磁盘I/O成为系统性能的瓶颈,因此只适⽤于离线数据处理或批处理,⽽不能⽀持对迭代式、流式(实时式)数据的处理。
Spark对Hadoop的优化与改进
Spark的作者看到了MRv1的问题,并对MapReduce做了⼤量的改进和优化,主要包括以下5个⽅⾯:
1)减少磁盘I/O:
中间结果缓存在内存中:随着实时⼤数据应⽤越来越多,Hadoop作为离线的⾼吞吐、低响应框架已不能满⾜这类需求。Hadoop MapReduce的map端将中间输出和结果存储在磁盘中,reduce端⼜需要从磁盘读写中间结果,从⽽造成磁盘I/O成为瓶颈。Spark则允许将map端的中间输出和结果缓存在内存中,从⽽使得reduce端在拉取中间结果时避免了⼤量的磁盘I/O。
应⽤程序上传的资源⽂件缓存在Driver本地⽂件服务的内存中:Hadoop YARN中的ApplicationMaster申请到Container后,具体任务需要利⽤NodeManager从HDFS的不同节点下载任务所需的资源(如Jar包),增加了磁盘I/O。Spark则将应⽤程序上传的资源⽂件缓存在Driver本地⽂件服务的内存中,当Executor执⾏任务时直接从Driver的内存中读取,从⽽节省了⼤量的磁盘I/O。
2)增加任务并⾏度:由于将中间结果写到磁盘与从磁盘读取中间结果属于不同的环节,Hadoop将它们简单地通过串⾏执⾏衔接起来。⽽Spark则把不同的环节抽象为Stage,允许多个Stage既可以串⾏执⾏,⼜可以并⾏执⾏。
3)避免重新计算:当Stage中某个分区的Task执⾏失败后,会重新对此Stage调度,但在重新调度的时候会过滤已经执⾏成功的分区任务,所以不会造成重复计算和资源浪费。
4)可选的Shuffle排序:Hadoop MapReduce在Shuffle之前会将中间结果按key的hash值和key值⼤⼩进⾏两层排序,确保分区内部的有序性。⽽Spark则可以根据不同场景选择在map端排序还是reduce端排序。
5)灵活的内存管理策略:Spark将内存分为堆上的存储内存、堆外的存储内存、堆上的执⾏内存、堆外的执⾏内存4个部分。Spark既提供了执⾏内存和存储内存之间固定边界的实现,⼜提供了执⾏内存和存储内存之间“软”边界的实现。Spark默认使⽤“软”边界的实现,执⾏内存或存储内存中的任意⼀⽅在资源不⾜时都可以借⽤另⼀⽅的内存,最⼤限度地提⾼资源的利⽤率,减少对资源的浪费。Spark由于对内存使⽤的偏好,内存资源的多寡和使⽤率就显得尤为重要,为此Spark的内存管理器提供的Tungsten实现了⼀种与操作系统的内存Page ⾮常相似的数据结构,⽤于直接操作操作系统内存,节省了创建的Java对象在堆中占⽤的内存,使得Spark对内存的使⽤效率更加接近硬件。Spark会给每个
Task分配⼀个配套的任务内存管理器,对Task粒度的内存进⾏管理。Task的内存可以被多个内部的消费者消费,任务内存管理器对每个消费者进⾏Task内存的分配与管理,因此Spark对内存有着更细粒度的管理。
除了上述的改进外,Spark还具有以下特点:
1)检查点⽀持:Spark的RDD之间维护了⾎缘关系(lineage),⼀旦某个RDD失败了,则可以由⽗RDD重建。虽然lineage可⽤于错误后RDD的恢复,但对于很长的lineage来说,恢复过程⾮常耗时。如果应⽤启⽤了检查点,那么在Stage中的Task都执⾏成功
后,SparkContext将把RDD计算的结果保存到检查点,这样当某个RDD执⾏失败后,再由⽗RDD重建时就不需要重新计算,⽽直接从检查点恢复数据。
2)易于使⽤。Spark现在⽀持Java、Scala、Python和R等语⾔编写应⽤程序,⼤⼤降低了使⽤者的门槛。除此之外,还⾃带了80多个⾼等级操作符,允许在Scala、Python、R的shell中进⾏交互式查询。
3)⽀持交互式:Spark使⽤Scala开发,并借助于Scala类库中的Iloop实现交互式shell,提供对REPL(Read-eval-print-loop)的实现。
4)⽀持SQL查询。在数据查询⽅⾯,Spark⽀持SQL及Hive SQL,这极⼤地⽅便了传统SQL开发和数
据仓库的使⽤者。
5)⽀持流式计算:与MapReduce只能处理离线数据相⽐,Spark还⽀持实时的流计算。Spark依赖Spark Streaming对数据进⾏实时的处理,其流式处理能⼒还要强于Storm。
6)⾼可⽤:Spark⾃⾝实现了Standalone部署模式,此模式下的Master可以有多个,解决了单点故障问题。Spark也完全⽀持使⽤外部的部署模式,⽐如YARN、Mesos、EC2等。
7)丰富的数据源⽀持:Spark除了可以访问操作系统⾃⾝的⽂件系统和HDFS之外,还可以访问Kafka、Socket、Cassandra、HBase、Hive、Alluxio(Tachyon)及任何Hadoop的数据源。
8)丰富的⽂件格式⽀持:Spark⽀持⽂本⽂件格式、CSV⽂件格式、JSON⽂件格式、ORC⽂件格式、Parquet⽂件格式、Libsvm⽂件格式,有利于Spark与其他数据处理平台的对接。
Spark基础概念
RDD(resillient distributed dataset):弹性分布式数据集。Spark应⽤程序通过使⽤Spark的转换API,可以将RDD封装为⼀系列具有⾎缘关系的RDD,也就是DAG。只有通过Spark的动作API才会将RDD及其DAG提交到DAGScheduler。RDD的祖先⼀定是⼀个跟数据源相关的RDD,负责从数据源迭代读取数据。
DAG(Directed Acycle graph):有向⽆环图。在图论中,如果⼀个有向图⽆法从某个顶点出发经过若⼲条边回到该点,则这个图是⼀个有向⽆环图(DAG图)。Spark使⽤DAG来反映各RDD之间的依赖或⾎缘关系。 Partition:数据分区,即⼀个RDD的数据可以划分为多少个分区。Spark根据Partition的数量来确定Task的数量。
NarrowDependency:窄依赖,即⼦RDD依赖于⽗RDD中固定的Partition。Narrow-Dependency分为OneToOneDependency和RangeDependency两种。
ShuffleDependency:Shuffle依赖,也称为宽依赖,即⼦RDD对⽗RDD中的所有Partition都可能产⽣依赖。⼦RDD对⽗RDD各个Partition的依赖将取决于分区计算器(Partitioner)的算法。
Job:⽤户提交的作业。当RDD及其DAG被提交给DAGScheduler调度后,DAGScheduler会将所有RDD中的转换及动作视为⼀个Job。⼀个Job由⼀到多个Task组成。
Stage:Job的执⾏阶段。DAGScheduler按照ShuffleDependency作为Stage的划分节点对RDD的DAG进⾏Stage划分(上游的Stage 将为ShuffleMapStage)。因此⼀个Job可能被划分为⼀到多个Stage。Stage分为ShuffleMapStage和ResultStage两种。
Task:具体执⾏任务。⼀个Job在每个Stage内都会按照RDD的Partition数量,创建多个Task。Task分
为ShuffleMapTask和ResultTask两种。ShuffleMapStage中的Task为ShuffleMapTask,⽽ResultStage中的Task为ResultTask。ShuffleMapTask和ResultTask类似于Hadoop中的Map任务和Reduce任务。
Shuffle:Shuffle是所有MapReduce计算框架的核⼼执⾏阶段,Shuffle⽤于打通map任务(在Spark中就是ShuffleMapTask)的输出与reduce任务(在Spark中就是ResultTask)的输⼊,map任务的中间输出结果按照指定的分区策略(例如,按照key值哈希)分配给处理某⼀个分区的reduce任务。
Spark基本组成与架构
Apache Spark由SparkCore、Spark SQL、Spark Streaming、GraphX、MLlib等模块组成。模块间的整体关系如下图所⽰:
其中Spark Core是Apache Spark的核⼼,是其他扩展模块的基础运⾏时环境。下⾯我们简要描述SparkCore的功能和其他扩展模块的功能。
Spark Core,主要提供Spark应⽤的运⾏时环境,包括以下功能:
基础设施:
SparkConf:⽤于管理Spark应⽤程序的各种配置信息;
内置的基于Netty的RPC框架,包括同步和异步的多种实现。RCP框架时Spark各组件间通信的基础;
事件总线: SparkContext内部各组件间使⽤事件—模式异步调⽤的实现;
度量系统:由Spark中的多种度量源(Source)和多种度量输出(Sink)构成,完成对整个Spark集中各组件运⾏期状态的监控;SparkContext:通常⽽⾔,⽤户开发的Spark应⽤程序的提交与执⾏都离不开SparkContex的⽀持。在正式提交应⽤程序之前,⾸先需要初始化SparkContext。SparkContext隐藏了⽹络通信、分布式部署、消息通信、存储体系、计算引擎、度量系统、⽂件服务、Web UI等内容,应⽤程序开发者只需要使⽤SparkContext提供的API完成功能开发。
SparkEnv:Spark执⾏环境SparkEnv是Spark中的Task运⾏所必需的组件。SparkEnv内部封装了RPC环境(RpcEnv)、序列化管理器、⼴播管理器(BroadcastManager)、map任务输出跟踪器(MapOutputTracker)、存储体系、度量系统(MetricsSystem)、输出提交协调器(OutputCommitCoordinator)等Task运⾏所需的各种组件。
存储体系:Spark优先考虑使⽤各节点的内存作为存储,当内存不⾜时才会考虑使⽤磁盘,这极⼤地减少了磁盘I/O,提升了任务执⾏的效率,使得Spark适⽤于实时计算、迭代计算、流式计算等场景。在实际场景中,有些Task是存储密集型的,有些则是计算密集型的,所以有时候会造成存储空间很空闲,⽽计算空间的资源⼜很紧张。Spark的内存存储空间与执⾏存储空间之间的边界可以是“软”边界,
因此资源紧张的⼀⽅可以借⽤另⼀⽅的空间,这既可以有效利⽤资源,⼜可以提⾼Task的执⾏效率。此外,Spark的内存空间还提供了Tungsten的实现,直接操作操作系统的内存。由于Tungsten省去了在堆内分配Java对象,因此能更加有效地利⽤系统的内存资源,并且因为直接操作系统内存,空间的分配和释放也更迅速。
调度系统:调度系统主要由DAGScheduler和TaskScheduler组成,它们都内置在SparkContext中。DAGScheduler负责创建Job、将DAG中的RDD划分到不同的Stage、给Stage创建对应的Task、批量提交Task等功能。TaskScheduler负责按照FIFO或者FAIR等调度算法对批量Task进⾏调度;为Task分配资源;将Task发送到集管理器的当前应⽤的Executor上,由Executor负责执⾏等⼯作。即使现在Spark增加了SparkSession和DataFrame等新的API,但这些新API的底层实际依然依赖于SparkContext。
计算引擎:计算引擎由内存管理器(MemoryManager)、Tungsten、任务内存管理器(TaskMemory-Manager)、Task、外部排序器(ExternalSorter)、Shuffle管理器(ShuffleManager)等组成。
MemoryManager除了对存储体系中的存储内存提供⽀持和管理外,还为计算引擎中的执⾏内存提供⽀持和管理。
Tungsten除⽤于存储外,也可以⽤于计算或执⾏。
TaskMemoryManager对分配给单个Task的内存资源进⾏更细粒度的管理和控制。
ExternalSorter⽤于在map端或reduce端对ShuffleMapTask计算得到的中间结果进⾏排序、聚合等操作。
ShuffleManager⽤于将各个分区对应的ShuffleMapTask产⽣的中间结果持久化到磁盘,并在reduce端按照分区远程拉取ShuffleMapTask产⽣的中间结果。
Spark SQL: 由于SQL具有普及率⾼、学习成本低等特点,为了扩⼤Spark的应⽤⾯,还增加了对SQL及Hive的⽀持。Spark SQL的过程可以总结为:⾸先使⽤SQL语句解析器(SqlParser)将SQL转换为语法树(Tree),并且使⽤规则执⾏器(RuleExecutor)将⼀系列规则(Rule)应⽤到语法树,最终⽣成物理执⾏计划并执⾏的过程。其中,规则包括语法分析器(Analyzer)和优化器(Optimizer)。Hive的执⾏过程与SQL类似。
Spark Streaming: Spark Streaming与Apache Storm类似,也⽤于流式计算。Spark Streaming⽀持Kafka、Flume、Kinesis和简单的TCP套接字等多种数据输⼊源。输⼊流接收器(Receiver)负责接⼊数据,是接⼊数据流的接⼝规范。Dstream是Spark Streaming 中所有数据流的抽象,Dstream可以被组织为DStream Graph。Dstream本质上由⼀系列连续的RDD组成。
GraphX: Spark提供的分布式图计算框架。GraphX主要遵循整体同步并⾏计算模式(Bulk SynchronousParallell,BSP)下的Pregel 模型实现。GraphX提供了对图Graph的抽象,Graph由顶点(Vertex)、边(Edge)及继承了Edge的EdgeTriplet(添加了srcAttr和dstAttr,⽤来保存源顶点和⽬的顶点的属性)三种结构组成。GraphX⽬前已经封装了最短路径、⽹页排名、连接组件、三⾓关系统计等算法的实现,⽤户可以选择使⽤。
MLlib: Spark提供的机器学习框架。机器学习是⼀门涉及概率论、统计学、逼近论、凸分析、算法复杂度理论等多领域的交叉学科。MLlib⽬前已经提供了基础统计、分类、回归、决策树、随机森林、朴素贝叶斯、保序回归、协同过滤、聚类、维数缩减、特征提取与转型、频繁模式挖掘、预⾔模型标记语⾔、管道等多种数理统计、概率论、数据挖掘⽅⾯的数学算法。
Spark编程模型
Spark应⽤程序从编写到提交、执⾏、输出的整个过程如下图所⽰:
步骤如下:
1)⽤户使⽤SparkContext提供的API编写Driver应⽤程序。有时我们会使⽤SparkSession、DataFrame、SQLContext、HiveContext以及StreamingContext等提供的API编写Driver应⽤程序,其
实SparkSession、DataFrame、SQLContext、HiveContext以及StreamingContext都对SparkContext进⾏了封装,并提供了DataFrame、SQL、Hive以及流式计算相关的API。
2)使⽤SparkContext提交的⽤户应⽤程序:
⾸先会通过RpcEnv向集管理器(Cluster Manager)注册应⽤(Application)并且告知集管理器需要的资源数量。
集管理器根据Application的需求,给Application分配Executor资源,并在Worker上启动CoarseGrainedExecutorBackend进程(该进程内部将创建Executor)。
Executor所在的CoarseGrainedExecutorBackend进程在启动的过程中将通过RpcEnv直接向Driver注册Executor的资源信
息,TaskScheduler将保存已经分配给应⽤的Executor资源的地址、⼤⼩等相关信息。
scala不是内部或外部命令SparkContext根据各种转换API,构建RDD之间的⾎缘关系和DAG,RDD构成的DAG将最终提交给DAGScheduler。
DAGScheduler给提交的DAG创建Job,并根据RDD的依赖性质将DAG划分为不同的Stage。DAGScheduler根据Stage内RDD的Partition数量创建多个Task并批量提交给TaskScheduler。
TaskScheduler对批量的Task按照FIFO或FAIR调度算法进⾏调度,然后给Task分配Executor资源
最后将Task发送给Executor由Executor执⾏。此外,SparkContext还会在RDD转换开始之前使⽤BlockManager和BroadcastManager将任务的Hadoop配置进⾏⼴播。
3)集管理器(Cluster Manager)会根据应⽤的需求,给应⽤分配资源,即将具体任务分配到不同Worker节点上的多个Executor来处理任务的运⾏。Standalone、YARN、Mesos、EC2等都可以作为Spark的集管理器。
4)Task在运⾏的过程中需要对⼀些数据(如中间结果、检查点等)进⾏持久化,Spark⽀持选择HDFS、Amazon S3、Alluxio(原名叫Tachyon)等作为存储。
Spark集架构
从集部署的⾓度看,Spark集由集管理器(Cluster Manager)、⼯作节点(Worker)、执⾏器(Executor)、驱动器(Driver)、应⽤程序(Application)等部分组成,其整体关系如下图所⽰:
1)Cluster Manager:Spark的集管理器,主要负责对整个集资源的分配与管理。ClusterManager在YARN部署模式下为ResourceManager;在Mesos部署模式下为Mesos Master;在Standalone部署模式下为Master。Cluster Manager分配的资源属于⼀级分配,它将各个Worker上的内存、CPU等资
源分配给Application,但是并不负责对Executor的资源分配。Standalone部署模式下的Master会直接给Application分配内存、CPU及Executor等资源。⽬前,Standalone、YARN、Mesos、EC2等都可以作为Spark的集管理器。
2)Worker:Spark的⼯作节点。在YARN部署模式下实际由NodeManager替代。Worker节点主要负责以下⼯作:将⾃⼰的内存、CPU 等资源通过注册机制告知Cluster Manager;创建Executor;将资源和任务进⼀步分配给Executor;同步资源信息、Executor状态信息给Cluster Manager等。在Standalone部署模式下,Master将Worker上的内存、CPU及Executor等资源分配给Application后,将命令Worker启动CoarseGrainedExecutorBackend进程(此进程会创建Executor实例)。
3)Executor:主要负责任务的执⾏及与Worker、Driver的信息同步。
4)Driver: Application的驱动程序,Application通过Driver与Cluster Manager、Executor进⾏通信。Driver可以运⾏在Application 中,也可以由Application提交给Cluster Manager并由Cluster Manager安排Worker运⾏。
5)Application:⽤户使⽤Spark提供的API编写的应⽤程序,Application通过Spark API将进⾏RDD的转换和DAG的构建,并通过Driver将Application注册到Cluster Manager。Cluster Manager将会根据Application的资源需求,通过⼀级分配将Executor、内存、CPU等资源分配给Application。Driver通过
⼆级分配将Executor等资源分配给每⼀个任务,Application最后通过Driver告诉Executor运⾏任务。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论