SparkStreaming⼯作原理
⼀:SparkCore,SparkSQL和SparkStreaming的类似之处
(⼀)SparkCore
Spark Core主要是作为离线批处理(Batch Processing),每次处理的数据都是⼀个固定的数据集,⽽不是变化的
相关概念:
RDD:弹性分布式数据集
Spark Context:Spark的上下⽂,它负责与程序和spark集进⾏交互,包括申请集资源、创建RDD、accumulators及⼴播变量等。
(⼆)Spark SQL
Spark SQL⽤于交互式处理(interactive Processing),同样的,每次处理的数据都是⼀个固定的数据集,⽽不是变化的
相关概念:
DataFrame=RDD+Schema
DataFrame:相当于⼀个Row类型的DataSet,在Spark 2.x之后推荐使⽤DataSet
SQLContext:SQL的上下⽂
(三)Spark Streaming
Spark Streaming是⼀个流式数据处理(Stream Processing)的框架,要处理的数据就像流⽔⼀样源源不断的产⽣,就需要实时处理。
在Spark Streaming中,对于Spark Core进⾏了API的封装和扩展,将流式的数据切分为⼩批次(batch,称之为微批,按照时间间隔切分)进⾏处理,可以⽤于进⾏⼤规模、⾼吞吐量、容错的实时数据流的处理。---同Storm相⽐:Storm是来⼀条数据处理⼀条数据,是真正意义上的实时处理
⽀持从很多种数据源中读取数据,使⽤算⼦来进⾏数据处理,处理后的数据可以被保存到⽂件系统、数据库等存储中
相关概念:
DStream:离散流,相当于是⼀个数据的集合
StreamingContext:在创建StreamingContext的时候,会⾃动的创建SparkContext对象
对于电商来说,每时每刻都会产⽣数据(如订单,⽹页的浏览数据等),这些数据就需要实时的数据处理
将源源不断产⽣的数据实时收集并实时计算,尽可能快的得到计算结果并展⽰。
⼆:Spark Streaming组成部分
hbase工作原理(⼀)数据源
⼤多情况从Kafka中获取数据,还可以从Flume中直接获取,还能从TCP Socket中获取数据(⼀般⽤于开发测试)
(⼆)数据处理
主要通过DStream针对不同的业务需求使⽤不同的⽅法(算⼦)对数据进⾏相关操作。
企业中最多的两种类型统计:实时累加统计(如统计某电商销售额)会⽤到DStream中的算⼦updateStateBykey、实时统计某段时间内的数据(如对趋势进⾏统计分析,实时查看最近20分钟内各个省份⽤户点击⼴告的流量统计)会⽤到reduceByKeyAndWindow这个算⼦。
(三)存储结果
调⽤RDD中的API将数据进⾏存储,因为Spark Streaming是将数据分为微批处理的,所以每⼀批次就相当于⼀个RDD。
可以把结果存储到Console(控制台打印,开发测试)、Redis(基于内存的分布式Key-Value数据库)、HBase(分布式列式数据库)、RDBMS(关系型数据库,如MySQL,通过JDBC)
三:SparkStreaming的运⾏流程
(⼀)运⾏流程图
(⼆)运⾏流程
1、我们在集中的其中⼀台机器上提交我们的Application Jar,然后就会产⽣⼀个Application,开启⼀个Driver,然后初始化SparkStreaming的程序⼊⼝StreamingContext;
2、Master(Driver是spark作业的Master)会为这个Application的运⾏分配资源,在集中的⼀台或者多台Worker上⾯开启Executer,executer会向Driver注册;
3、Driver服务器会发送多个receiver给开启的executer,(receiver是⼀个接收器,是⽤来接收消息的,在excuter⾥⾯运⾏的时候,其实就相当于⼀个task任务)
每个作业包含多个Executor,每个Executor以线程的⽅式运⾏task,Spark Streaming⾄少包含⼀个receiver task。
4、receiver接收到数据后,每隔200ms就⽣成⼀个block块,就是⼀个rdd的分区,然后这些block块就存储在executer⾥⾯,block块的存储级别是Memory_And_Disk_2;
5、receiver产⽣了这些block块后会把这些block块的信息发送给StreamingContext;
6、StreamingContext接收到这些数据后,会根据⼀定的规则将这些产⽣的block块定义成⼀个rdd;
四:Spark Streaming⼯作原理
(⼀)SparkStreaming⼯作原理
补充:
BlockInterval:200ms  ⽣成block块的依据,多久内的数据⽣成⼀个block块,默认值200ms⽣成⼀个block块,官⽹最⼩推荐值50ms。
BatchInterval:1s  我们将每秒的数据抽象为⼀个RDD。那么这个RDD⾥⾯包含了多个block(1s则是50个RDD),这些block是分散的存储在各个节点上的。
Spark Streaming内部的基本⼯作原理:
接收实时输⼊数据流,然后将数据拆分成多个batch,⽐如每收集1s的数据封装为⼀个batch,然后将每个batch交给Spark的计算引擎进⾏处理,最后会⽣产出⼀个结果数据流,
其中的数据,也是⼀个个的batch所组成的。
其中,⼀个batchInterval累加读取到的数据对应⼀个RDD的数据
(⼆)SparkStreaming和Storm对⽐
1.对⽐优势
Storm在实时延迟度上,⽐Spark Streaming就好多了,Storm是纯实时,Spark Streaming是准实时;⽽且Storm的事务机制,健壮性/容错性、动态调整并⾏度等特性,都要⽐
Spark Streaming更加优秀。
Spark Streaming的真正优势(Storm绝对⽐不上的),是它属于Spark⽣态技术栈中,因此Spark Streaming可以和Spark Core、Spark SQL⽆缝整合,⽽这也就意味着,我们可
以对实时处理出来的中间数据,⽴即在程序中⽆缝进⾏延迟批处理、交互式查询等操作,这个特点⼤⼤增强了Spark Streaming的优势和功能。
2.应⽤场景
1、建议在那种需要纯实时,不能忍受1s以上延迟的场景下使⽤,⽐如⾦融系统,要求纯实时进⾏⾦融交易和分析;
2、如果对于实时计算的功能中,要求可靠的事务机制和可靠性机制,即数据的处理完全精准,⼀条也不能多,⼀条也不能少,也可以考虑使⽤Strom;
3、如果需要针对⾼峰低峰时间段,动态调整实时计算程序的并⾏度,以最⼤限度利⽤集资源,也可以考虑⽤Storm;
4、如果⼀个⼤数据应⽤系统,它就是纯粹的实时计算,不需要在中间执⾏SQL交互式查询、复杂的transformation算⼦等,那么使⽤Storm是⽐较好的选择
storm
1、如果对上述适⽤于Storm的三点,⼀条都不满⾜的实时场景,即,不要求纯实时,不要求强⼤可靠的事务机制,不要求动态调整并⾏度,那么可以考虑使⽤Spark Streaming;
2、考虑使⽤Spark Streaming最主要的⼀个因素,应该是针对整个项⽬进⾏宏观的考虑,即,如果⼀个项⽬除了实时计算之外,还包括了离线批处理、交互式查询等业务功能,⽽且实时计算中,可能还会牵扯到⾼延迟批处理、交互式查询等功能,那sparkstreaming
五:DStream离散流
DStream离散流是Spark Streaming提供的⼀种⾼级抽象,DStream代表了⼀个持续不断的数据流;
DStream可以通过输⼊数据源来创建,⽐如Kafka、Flume,也可以通过对其他DStream应⽤⾼阶函数来创建,⽐如map、reduce、join、window;
(⼀)DStream原理(与四中补充信息有关联)
DStream的内部,其实是⼀系列持续不断产⽣的RDD,RDD是Spark Core的核⼼抽象,即,不可变的,分布式的数据集;
DStream中的每个RDD都包含了⼀个时间段内的数据;
以下图为例,0-1这段时间的数据累积构成了RDD@time1,1-2这段时间的数据累积构成了
(⼆)DStream算⼦⼯作
对DStream应⽤的算⼦,其实在底层会被翻译为对DStream中每个RDD的操作。
⽐如对⼀个DStream执⾏⼀个map操作,会产⽣⼀个新的DStream,其底层原理为,对输⼊DStream中的每个时间段的RDD,都应⽤⼀遍map操作,然后⽣成的RDD,即作为新的DStream中的那个时间段的⼀个RDD;
底层的RDD的transformation操作,还是由Spark Core的计算引擎来实现的,Spark Streaming对Spark core进⾏了⼀层封装,隐藏了细节,然后对开发⼈员提供了⽅便易⽤的⾼层次API。
(三)DStream算⼦
六:StreamingContext的使⽤
(⼀)StreamingContext的创建
1.直接使⽤sparkconf配置创建
val conf = new SparkConf().setAppName(appName).setMaster("local");
val ssc = new StreamingContext(conf, Seconds(1));
2.使⽤已创建的sparkcontext创建
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(1));  //batch interval可以根据你的应⽤程序的延迟要求以及可⽤的集资源情况来设置
(⼆)StreamingContext的使⽤
1.通过创建输⼊DStream来创建输⼊数据源。
2.通过对DStream定义transformation和output算⼦操作,来定义实时计算逻辑。
3.调⽤StreamingContext的start()⽅法,来开始实时处理数据。
4.调⽤StreamingContext的awaitTermination()⽅法,来等待应⽤程序的终⽌。可以使⽤CTRL+C⼿动停⽌,或者就是让它持续不断的运⾏进⾏计算。
5.也可以通过调⽤StreamingContext的stop()⽅法,来停⽌应⽤程序。
(三)注意事项
1.只要⼀个StreamingContext启动之后,就不能再往其中添加任何计算逻辑了。⽐如执⾏start()⽅法之后,
还给某个DStream执⾏⼀个算⼦。
2.⼀个StreamingContext停⽌之后,是肯定不能够重启的,调⽤stop()之后,不能再调⽤start()
3.⼀个JVM同时只能有⼀个StreamingContext启动,在你的应⽤程序中,不能创建两个StreamingContext。
4.调⽤stop()⽅法时,会同时停⽌内部的SparkContext,如果不希望如此,还希望后⾯继续使⽤SparkContext创建其他类型的Context,⽐如SQLContext,那么就⽤stop(false)。
5.⼀个SparkContext可以创建多个StreamingContext,只要上⼀个先⽤stop(false)停⽌,再创建下⼀个即可。
七:代码编写
(⼀)从Socket获取数据---⽤于测试
def main(args:Array[String]):Unit={
val conf = new SparkConf().setAppName("WordCount").setMaster("local[5]")
val sc = new SparkContext(conf)
//设置streamingcontext
val scc = new StreamingContext(sc,Seconds(2))
//数据输⼊
val inDStream:ReceiverInputDStream[String] = scc.socketTextStream("localhost", 9090)
inDStream.print()  //数据打印
//数据处理
val resultDStream:DStream[(String,Int)]=inDStream.flatMap(_.split(",")).map((_,1)).reduceByKey(_+_)
//数据输出
resultDStream.print()
//启动应⽤程序
scc.start()
scc.awaitTermination()
scc.stop()
}
nc下载地址:,在cmd中使⽤:
(⼆)设置hdfs获取数据
⼋:缓存与持久化机制
与RDD类似,Spark Streaming也可以让开发⼈员⼿动控制,将数据流中的数据持久化到内存中,对DStream调⽤persist()⽅法,就可以让Spark Streaming⾃动将该数据流中的所有产⽣的RDD都持久化到内存中。如果要对⼀个DStream多次执⾏操作,那么,对DSteram持久化是⾮常有⽤的。因为多次操作,可以共享使⽤内存中的⼀份缓存数据。
对于基础窗⼝的操作,⽐如reduceByWindow、reduceByKeyAndWindow,以及基于状态的操作,⽐如updateStateByKey,默认就隐式开启了持久化机制,即Spark Streaming 默认就会将上述操作产⽣的DStream中的数据,缓存到内存中,不需要开发⼈员⼿动调⽤persist()⽅法。
对于通过⽹络接收数据的输⼊流,⽐如Socket、Kafka、Flume等,默认的持久化级别是将数据复制⼀份,以便于容错,相当于⽤的是MEMORY_ONLY_SER_2。
与Spark Core中的RDD不同的是,默认的持久化级别,统⼀都是要序列化的。
九:Checkpoint机制
每⼀个Spark Streaming应⽤,正常来说都是要7x24⼩时运转的,这就是实时计算程序的特点。要持续不断的对数据进⾏计算,必须要能够对于应⽤程序逻辑⽆关的失败进⾏容错。
对于⼀些将多个batch的数据进⾏聚合的,有状态的transformation操作,这是⾮常有⽤的。在这种transformation操作中,⽣成的RDD是依赖之前的batch中的RDD的,这样就会随着时间的推移,依赖链条越来越长,从⽽导致失败恢复时间也变得越来越差。有状态的transformation操作执⾏过程当中产⽣的RDD要定期的被checkpoint到可靠的存储上,这样做可以消减RDD的依赖链条,从⽽缩短恢复时间。
当使⽤了有状态的transformation操作时,必须要开启checkpoint机制,提供checkpoint⽬录。
注意,并不是所有的Spark Streaming应⽤程序都要启⽤checkpoint机制
如何启⽤Checkpoint机制:
1.配置⼀个⽂件系统(⽐如HDFS)的⽬录,作为checkpoint⽬录
2.使⽤StreamingContext的checkpoint⽅法,填⼊配置好的⽬录作为参数即
⼗:注意事项
如果要在实时计算应⽤中并⾏接收多条数据流,可以创建多个输⼊DStream,这样就会创建多个Receiver,从⽽并⾏地接收多个数据流。这⾥有⼀个问题,⼀个Spark Streaming 应⽤程序的executor是⼀个长期运⾏的任务,所以它会独占分配给Spark Streaming应⽤程序的CPU core,所以只要Spark Streaming运⾏起来之后,这个节点上的CPU core数就没有办法给其他的应⽤所使⽤了,因为会被Receiver所独占。
使⽤本地模式运⾏程序时,必须使⽤local[n],n>=2绝对不能⽤local和local[1],因为就会给执⾏输⼊DStream的executor分配⼀个线程,Spark Streaming底层的原理需要⾄少有两个线程,⼀个线程分配给Receiver接收数据,另⼀个线程⽤来处理接收到的数据。如果线程⼩于2的话,那么程序只会接收数据,不会处理数据。
如果直接将Spark Streaming应⽤提交到集上运⾏,需要保证有⾜够资源。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。