Spark基本概念及⼊门
spark
spark背景
什么是spark
Spark是⼀种快速、通⽤、可扩展的⼤数据分析引擎,2009年诞⽣于加州⼤学伯克利分校AMPLab,2010年开源,2013年6⽉成为Apache孵化项⽬,2014年2⽉成为Apache顶级项⽬。⽬前,Spark⽣态系统已经发展成为⼀个包含多个⼦项⽬的集合,其中包含SparkSQL、Spark Streaming、GraphX、MLlib等⼦项⽬,Spark是基于内存计算的⼤数据并⾏计算框架。Spark基于内存计算,提⾼了在⼤数据环境下数据处理的实时性,同时保证了⾼容错性和⾼可伸缩性,允许⽤户将Spark部署在⼤量廉价硬件之上,形成集。
Spark与Hadoop
Spark是⼀个计算框架,⽽Hadoop中包含计算框架MapReduce和分布式⽂件系统HDFS,Hadoop更⼴泛地说还包括在其⽣态系统上的其他系统.
为什么使⽤Spark?
Hadoop的MapReduce计算模型存在问题:
Hadoop的MapReduce的核⼼是Shuffle(洗牌).在整个Shuffle的过程中,⾄少产⽣6次I/O流.基于MapReduce计算引擎通常会将结果输出到次盘上,进⾏存储和容错.另外,当⼀些查询(如:hive)翻译到MapReduce任务是,往往会产⽣多个Stage,⽽这些Stage有依赖底层⽂件系统来存储每⼀个Stage的输出结果,⽽I/O的效率往往较低,从⽽影响MapReduce的运⾏速度.
Spark的特点: 快, 易⽤, 通⽤,兼容性
快:与Hadoop的MapReduce相⽐,Spark基于内存的运算要快100倍以上,基于硬盘的运算也要快10倍以上。Spark实现了⾼效的DAG执⾏引擎,可以通过基于内存来⾼效处理数据流。
易⽤:Spark⽀持Java、Python和Scala的API,还⽀持超过80种⾼级算法,使⽤户可以快速构建不同的应⽤。⽽且Spark⽀持交互式的Python和Scala的shell,可以⾮常⽅便地在这些shell中使⽤Spark集来验证解决问题的⽅法。
通⽤:Spark提供了统⼀的解决⽅案。Spark可以⽤于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。这些不同类型的处理都可以在同⼀个应⽤中⽆缝使⽤。Spark统⼀的解决⽅案⾮常具有吸引⼒,毕竟任何公司都想⽤统⼀的平台去处理遇到的问题,减少开发和维护的⼈⼒成本和部署平台的物⼒成本。
兼容性:Spark 可以⾮常⽅便地与其他的开源产品进⾏融合。⽐如,Spark 可以使⽤Hadoop 的 YARN 和 Apache Mesos 作为它的资源管理和调度器.并且可以处理所有Hadoop ⽀持的数据,包括 HDFS、HBase 和 Cassandra 等。这对于已经部署Hadoop 集的⽤户特别重要,因为不需要做任何数据迁移就可以使⽤ Spark 的强⼤处理能⼒。Spark 也可以不依赖于第三⽅的资源管理和调度器,它实现了Standalone 作为其内置的资源管理和调度框架,这样进⼀步降低了 Spark 的使⽤门槛,使得所有⼈都可以⾮常容易地部署和使⽤ Spark。此外,Spark 还提供了在EC2 上部Standalone 的 Spark 集的⼯具。
Spark的⽣态系统
1.Spark Streaming:
Spark Streaming基于微批量⽅式的计算和处理,可以⽤于处理实时的流数据.它使⽤DStream,简单来说是⼀个弹性分布式数据集(RDD)系列,处理实时数据.数据可以从Kafka,Flume,Kinesis或TCP套接字等众多来源获取,并且可以使⽤由⾼级函数(如 map,reduce,join 和 window)开发的复杂算法进⾏流数据处理。最后,处理后的数据可以被推送到⽂件系统,数据库和实时仪表板。
2.Spark SQL
SPark SQL可以通过JDBC API将Spark数据集暴露出去,⽽且还可以⽤传统的BI和可视化⼯具在Spark数据上执⾏类似SQL的查询,⽤户哈可以⽤Spark SQL对不同格式的数据(如Json, Parque以及数据库等)执⾏ETl,将其转化,然后暴露特定的查询.
3.Spark MLlib
MLlib是⼀个可扩展的Spark机器学习库,由通⽤的学习算法和⼯具组成,包括⼆元分类、线性回归、聚类、协同过滤、梯度下降以及底层优化原语。
4.Spark Graphx:
GraphX是⽤于图计算和并⾏图计算的新的(alpha)Spark API。通过引⼊弹性分布式属性图(Resilient Distributed Property Graph),⼀种顶点和边都带有属性的有向多重图,扩展了Spark RDD。为了⽀持图计算,GraphX暴露了⼀个基础操作符集合(如subgraph,joinVertices和aggregateMessages)和⼀个经过优化的Pregel API变体。
此外,GraphX还包括⼀个持续增长的⽤于简化图分析任务的图算法和构建器集合。
5.Tachyon
Tachyon是⼀个以内存为中⼼的分布式⽂件系统,能够提供内存级别速度的跨集框架(如Spark和mapReduce)的可信⽂件共享.它将⼯作集⽂件缓存在内存中,从⽽避免到磁盘中加载需要经常读取的数据集,通过这⼀机制,不同的作业/查询和框架可以内存级的速度访问缓存⽂件.
此外,还有⼀些⽤于与其他产品集成的适配器,如Cassandra(Spark Cassandra 连接器)和R(SparkR)。Cassandra Connector可⽤于访问存储在Cassandra数据库中的数据并在这些数据上执⾏数据分析。
6.Mesos
Mesos是⼀个资源管理框架
提供类似于YARN的功能
⽤户可以在其中插件式地运⾏Spark,MapReduce,Tez等计算框架任务
Mesos对资源和任务进⾏隔离,并实现⾼效的资源任务调度
7.BlinkDB
BlinkDB是⼀个⽤于在海量数据上进⾏交互式SQL的近似查询引擎
允许⽤户通过查询准确性和查询时间之间做出权衡,完成近似查询
核⼼思想:通过⼀个⾃适应优化框架,随着时间的推移,从原始数据建⽴并维护⼀组多维样本,通过⼀个动态样本选择策略,选择⼀个适当⼤⼩的⽰例,然后基于查询的准确性和响应时间满⾜⽤户查询需求
除了这些库意外,还有⼀些其他的库,如Blink和Tachyon.
BlinkDB是⼀个近似查询引擎,⽤于海量数据执⾏交互式SQL查询.BlinkDB可以通过牺牲数据精度来提升查询响应时间.通过在数据样本上执⾏查询并展⽰包含有意义的错误线注解的结果,操作⼤数据集合.
Spark架构采⽤了分布式计算中的Master-Slave模型。Master是对应集中的含有Master进程的节点,Slave是集中含有Worker进程的节点。Master作为整个集的控制器,负责整个集的正常运⾏;Worker相当于是计算节点,接收主节点命令与进⾏状态汇报;Executor负责任务的执⾏;Client作为⽤户的客户端负责提交应⽤,Driver负责控制⼀个应⽤的执⾏.hadoop与spark的区别与联系
Spark集部署后,需要在主节点和从节点分别启动master进程和Worker进程,对整个集进⾏控制.在⼀个Spark应⽤的执⾏程序中.Driver和Worker是两个重要的⾓⾊.Driver程序是应⽤逻辑执⾏的起点,负责作业的调度,即Task任务的发布,⽽多个Worker⽤来管理计算节点和创建Executor并⾏处理任务.在执⾏阶段,Driver会将Task和Task所依赖的file和jar序列化后传递给对应的Worker机器.同时Executor对相应数据分区的任务进⾏处理.
Sparkde架构中的基本组件:
ClusterManager:在standlone模式中即为Master(主节点),控制整个集.监控Worker.在Yarn模式中为资源管理器.
Worker:从节点,负责控制计算节点,启动Ex⽽粗投⼊或Driver
NodeManager:负责计算节点的控制。
Driver:运⾏Application的main() 函数并创建SparkContext
Executor: 执⾏器,在worker node上执⾏任务组件,⽤于启动线程执⾏任务.每个Application拥有独⽴的⼀组Executors
SparkContext: 整个应⽤的上下⽂,监控应⽤的⽣命周期
RDD:弹性分布式集合,spark的基本计算单元,⼀组RDD可形成执⾏的有向⽆环图RDD Graph
DAG Scheduler: 根据作业(Job)构建基于Stage的DAG,并交给Stage给TaskScheduler
TaskScheduler:将任务(Task)分发给Executor执⾏
SparkEnv:线程级别的上下⽂,存储运⾏时的重要组件的引⽤。SparkEnv内创建并包含如下⼀些重要组件的引⽤。
MapOutPutTracker:负责Shuffle元信息的存储。
BroadcastManager:负责⼴播变量的控制与元信息的存储。
BlockManager:负责存储管理、创建和查块。
MetricsSystem:监控运⾏时性能指标信息。
SparkConf:负责存储配置信息。
Spark的整体流程:client提交应⽤,Master到⼀个Worker启动Driver,Driver向Master或者向资源管理器申请资源,之后将应⽤转化为RDD Graph,再由DAGScheduler将RDD Graph转化为Stage的有向⽆环图提交给TaskScheduler,由TaskScheduler提交任务给Executor执⾏。在任务执⾏的过程中,其他组件协同⼯作,确保整个应⽤顺利执⾏。
搭建spark集
安装java环境,spark⾃动会把scala SDK打包到spark中⽆需安装scala环境
配置spark
$ cp $SPARK_HOME/conf/plate spark-env.sh
$ vim $SPARK_HOME/conf/spark-env.sh
添加
export JAVA_HOME=/usr/java/jdk1.8.0_191
#export SPARK_MASTER_IP=node-1
#export SPARK_MASTER_PORT=7077
$ cp $SPARK_HOME/plate slaves
$ vi slaves
# 在该⽂件中添加⼦节点所在的位置(Worker节点)
node-2
node-3
node-4
启动spark集
$SPARK_HOME/sbin/start-master.sh
$SPARK_HOME/sbin/start-slaves.sh
启动后执⾏jps命令,主节点上有Master进程,其他⼦节点上有Work进⾏,登录Spark管理界⾯查看集状态(主节点):
到此为⽌,Spark集安装完毕,但是有⼀个很⼤的问题,那就是Master节点存在单点故障,要解决此问题,就要借助zookeeper,并且启动⾄少两个Master节点来实现⾼可靠,
配置⽅式⽐较简单:
Spark集规划:node-1,node-2是Master;node-3,node-4,node-5是Worker
安装配置zk集,并启动zk集
停⽌spark所有服务,修改配置⽂件spark-env.sh,在该配置⽂件中删掉SPARK_MASTER_IP并添加如下配置
export SPARK_DAEMON_JAVA_OPTS="-veryMode=ZOOKEEPER -keeper.url=zk1,zk2,zk3 -keeper.dir=/spark"
1.在node1节点上修改slaves配置⽂件内容指定worker节点
2.在node1上执⾏$SPARK_HOME/sbin/start-all.sh,然后在node2上执⾏$SPARK_HOME/sbin/start-master.sh启动第⼆个Master
执⾏第⼀个spark程序
$SPARK_HOME/bin/spark-submit --class org.amples.SparkPi --master spark://localhost:7077 --executor-memory 1G --total-executor-cores 1 $SPARK_HOME/examples/jars/spark-examples_2.11-2.2.2.jar 100 spark Shell
spark-shell是Spark⾃带的交互式Shell程序,⽅便⽤户进⾏交互式编程,⽤户可以在该命令⾏下⽤scala编写spark程序。
$SPARK_HOME/bin/spark-shell \
--master spark://localhost:7077 \
--executor-memory 2g \
--total-executor-cores 2
参数说明:
--master spark://localhost:7077 指定Master的地址
--executor-memory 2g 指定每个worker可⽤内存为2G
--total-executor-cores 2 指定整个集使⽤的cup核数为2个
注意:
如果启动spark shell时没有指定master地址,但是也可以正常启动spark shell和执⾏spark shell中的程序,其实是启动了spark的local模式,该模式仅在本机启动⼀个进程,没有
与集建⽴联系。Spark Shell中已经默认将SparkContext类初始化为对象sc。⽤户代码如果需要⽤到,则直接应⽤sc即可
spark shell中编写WordCount
在spark shell中⽤scala语⾔编写spark程序
说明:
sc是SparkContext对象,该对象时提交spark程序的⼊⼝
textFile("file:///root/tmp/words.dta") 从本地⽂件中读取数据
flatMap(_.split(" ")) 先map在压平
map((_,1)) 将单词和1构成元组
reduceByKey(+) 按照key进⾏reduce,并将value累加
saveAsTextFile("file:///root/tmp/out") 将结果写⼊到指定位置
spark RDD
RDD概述
什么是RDD
RDD(Resilient Distributed Dataset)叫做分布式数据集,是Spark中最基本的数据抽象,它代表⼀个不可变、可分区、⾥⾯的元素可并⾏计算的集合。RDD具有数据流模型的特点:⾃动容错、位置感知性调度和可伸缩性。RDD允许⽤户在执⾏多个查询时显式地将⼯作集缓存在内存中,后续的查询能够重⽤⼯作集,这极⼤地提升了查询速度。RDD的属性
⼀组分⽚(Partition),即数据集的基本组成单位。对于RDD来说,每个分⽚都会被⼀个计算任务处理,并决定并⾏计算的粒度。⽤户可以在创建RDD时指定RDD的分⽚个数,如果没有指定,那么就会采⽤默认值。默认值就是程序所分配到的CPU Core的数⽬。
⼀个计算每个分区的函数。Spark中RDD的计算是以分⽚为单位的,每个RDD都会实现compute函数以达到这个⽬的。compute函数会对迭代器进⾏复合,不需要保存每次计算的结果。
RDD之间的依赖关系。RDD的每次转换都会⽣成⼀个新的RDD,所以RDD之间就会形成类似于流⽔线⼀样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,⽽不是对RDD的所有分区进⾏重新计算。
⼀个Partitioner,即RDD的分⽚函数。当前Spark中实现了两种类型的分⽚函数,⼀个是基于哈希的HashPartitioner,另外⼀个是基于范围的RangePartitioner。只有对于于key-value的RDD,才会有Partitioner,⾮key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本⾝的分⽚数量,也决定了parent RDD Shuffle输出时的分⽚数量。
⼀个列表,存储存取每个Partition的优先位置(preferred location)。对于⼀个HDFS⽂件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进⾏任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。
创建RDD
由⼀个已经存在的Scala集合创建。
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
由外部存储系统的数据集创建,包括本地的⽂件系统,还有所有Hadoop⽀持的数据集,⽐如HDFS、Cassandra、HBase等
val rdd2 = sc.textFile("hdfs://localhost:9000/")
RDD编程模型
Transformation
RDD中的所有转换都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应⽤到基础数据集(例如⼀个⽂件)上的转换动作。只有当发⽣⼀个要求返回结果给Driver的动作时,这些转换才会真正运⾏。这种设计让Spark更加有效率地运⾏。
常⽤的Transformation:
转换含义
map(func)返回⼀个新的RDD,该RDD由每⼀个输⼊元素经过func函数转换后组成
filter(func)返回⼀个新的RDD,该RDD由经过func函数计算后返回值为true的输⼊元素组成
flatMap(func)类似于map,但是每⼀个输⼊元素可以被映射为0或多个输出元素(所以func应该返回⼀个序列,⽽不是单⼀元素)mapPartitions(func)类似于map,但独⽴地在RDD的每⼀个分⽚上运⾏,因此在类型为T的RDD上运⾏时,func的函数类型必须是Iterator[T] =>
Iterator[U]
mapPartitionsWithIndex(func)类似于mapPartitions,但func带有⼀个整数参数表⽰分⽚的索引值,因此在类型为T的RDD上运⾏时,func的函数类型必须是 (Int,
Interator[T]) => Iterator[U]
sample(withReplacement, fraction, seed)根据fraction指定的⽐例对数据进⾏采样,可以选择是否使⽤随机数进⾏替换,seed⽤于指定随机数⽣成器种⼦
union(otherDataset)对源RDD和参数RDD求并集后返回⼀个新的RDD
intersection(otherDataset)对源RDD和参数RDD求交集后返回⼀个新的RDD
distinct([numTasks]))对源RDD进⾏去重后返回⼀个新的RDD
groupByKey([numTasks])在⼀个(K,V)的RDD上调⽤,返回⼀个(K, Iterator[V])的RDD
reduceByKey(func, [numTasks])在⼀个(K,V)的RDD上调⽤,返回⼀个(K,V)的RDD,使⽤指定的reduce函数,将相同key的值聚合到⼀起,与groupByKey类
似,reduce任务的个数可以通过第⼆个可选的参数来设置
aggregateByKey(zeroValue)(seqOp,
combOp, [numTasks])
sortByKey([ascending], [numTasks])在⼀个(K,V)的RDD上调⽤,K必须实现Ordered接⼝,返回⼀个按照key进⾏排序的(K,V)的RDD
sortBy(func,[ascending], [numTasks])与sortByKey类似,但是更灵活
join(otherDataset, [numTasks])在类型为(K,V)和(K,W)的RDD上调⽤,返回⼀个相同key对应的所有元素对在⼀起的(K,(V,W))的RDD
cogroup(otherDataset, [numTasks])在类型为(K,V)和(K,W)的RDD上调⽤,返回⼀个(K,(Iterable,Iterable))类型的RDD
cartesian(otherDataset)笛卡尔积
pipe(command, [envVars])
coalesce(numPartitions)
repartition(numPartitions)
repartitionAndSortWithinPartitions(partitioner)
Action
动作含义
reduce(func)通过func函数聚集RDD中的所有元素,这个功能必须是课交换且可并联的
collect()在驱动程序中,以数组的形式返回数据集的所有元素
count()返回RDD的元素个数
first()返回RDD的第⼀个元素(类似于take(1))
take(n)返回⼀个由数据集的前n个元素组成的数组
takeSample(withReplacement,num,返回⼀个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否⽤随机数替换不⾜的部分,seed⽤于指定随机数⽣成器
[seed])种⼦
动作含义
takeOrdered(n, [ordering])
saveAsTextFile(path)将数据集的元素以textfile的形式保存到HDFS⽂件系统或者其他⽀持的⽂件系统,对于每个元素,Spark将会调⽤toString⽅法,将它装换为⽂件中的⽂本
saveAsSequenceFile(path)将数据集中的元素以Hadoop sequencefile的格式保存到指定的⽬录下,可以使HDFS或者其他Hadoop⽀持的⽂件系统。saveAsObjectFile(path)
countByKey()针对(K,V)类型的RDD,返回⼀个(K,Int)的map,表⽰每⼀个key对应的元素个数。
foreach(func)在数据集的每⼀个元素上,运⾏函数func进⾏更新。
RDD的依赖关系
RDD和它依赖的⽗RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。
shuffle重要的依据:⽗RDD的⼀个分区的数据,要给⼦RDD的多个分区
窄依赖
窄依赖指的是每⼀个⽗RDD的Partition最多被⼦RDD的⼀个Partition使⽤
总结:窄依赖我们形象的⽐喻为独⽣⼦⼥
宽依赖
宽依赖指的是多个⼦RDD的Partition会依赖同⼀个⽗RDD的Partition
总结:窄依赖我们形象的⽐喻为超⽣
Lineage
RDD只⽀持粗粒度转换,即在⼤量记录上执⾏的单个操作。将创建RDD的⼀系列Lineage(即⾎统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换⾏为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。
RDD的缓存
RDD通过persist⽅法或cache⽅法可以将前⾯的计算结果缓存,但是并不是这两个⽅法被调⽤时⽴即缓
存,⽽是触发后⾯的action时,该RDD将会被缓存在计算节点的内存中,并供后⾯重⽤。
cache最终也是调⽤了persist⽅法,默认的存储级别都是仅在内存存储⼀份,Spark的存储级别还有好多种,存储级别在object StorageLevel中定义的。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。