sparkapi之⼀:Spark官⽅⽂档-中⽂翻译
1 概述(Overview)
总体来讲,每⼀个Spark驱动程序应⽤都由⼀个驱动程序组成,该驱动程序包含⼀个由⽤户编写的main⽅法,该⽅法会在集上并⾏执⾏⼀些列并⾏计算操作。Spark最重要的⼀个概念是弹性分布式数据集,简称RDD(resilient distributed dataset )。RDD是⼀个数据容器,它将分布在集上各个节点上的数据抽象为⼀个数据集,并且RDD能够进⾏⼀系列的并⾏计算操作。可以将RDD理解为⼀个分布式的List,该List的数据为分布在各个节点上的数据。RDD通过读取Hadoop⽂件系统中的⼀个⽂件进⾏创建,也可以由⼀个RDD经过转换得到。⽤户也可以将RDD缓存⾄内存,从⽽⾼效的处理RDD,提⾼计算效率。另外,RDD有良好的容错机制。
Spark另外⼀个重要的概念是共享变量(shared variables)。在并⾏计算时,可以⽅便的使⽤共享变量。在默认情况下,执⾏Spark任务时会在多个节点上并⾏执⾏多个task,Spark将每个变量的副本分发给各个task。在⼀些场景下,需要⼀个能够在各个task间共享的变量。Spark⽀持两种类型的共享变量:
⼴播变量(broadcast variables):将⼀个只读变量缓存到集的每个节点上。例如,将⼀份数据的只读缓存分发到每个节点。
累加变量(accumulators):只允许add操作,⽤于计数、求和。
2 引⼊Spark(Linking with Spark)
在Spark 1.6.0上编写应⽤程序,⽀持使⽤Scala 2.10.X、Java 7+、Python 2.6+、R 3.1+。如果使⽤Java 8,⽀持lambda表达式(lambda expressions)。
在编写Spark应⽤时,需要在Maven依赖中添加Spark,Spark的Maven Central为:
groupId = org.apache.spark
artifactId = spark-core_2.10
version = 1.6.0
另外,如果Spark应⽤中需要访问HDFS集,则需要在hadoop-client中添加对应版本的HDFS依赖:
groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>
最后,需要在程序中添加Spark类。代码如下:
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
(在Spark 1.3.0之前的版本,使⽤Scala语⾔编写Spark应⽤程序时,需要添加import org.apache.spark.SparkContext._来启⽤必要的隐式转换)
3 初始化Spark(Initializing Spark)
使⽤Scala编写Spark程序的需要做的第⼀件事就是创建⼀个SparkContext对象(使⽤Java语⾔时创建JavaSparkContext)。SparkContext对象指定了Spark应⽤访问集的⽅式。创建SparkContext需要先创建⼀个SparkConf对象,SparkConf对象包含了Spark应⽤的⼀些列信息。代码如下:
Scala
val conf = new SparkConf().setAppName(appName).setMaster(master) new SparkContext(conf)
java
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master); JavaSparkContext sc = new JavaSparkContext(conf);
appName参数为应⽤程序在集的UI上显⽰的名字。master为Spark、Mesos、YARN URL或local。使⽤local值时,表⽰在本地模式下运⾏程序。应⽤程序的执⾏模型也可以在使⽤spark-submit命令提交任务时进⾏指定。
3.1 使⽤Spark Shell(Using the Shell)
scala不是内部或外部命令在Spark Shell下,⼀个特殊的SparkContext对象已经帮⽤户创建好,变量为sc。使⽤参数--master设置master参数值,使⽤参数--jars设置依赖包,多个jar包使⽤逗号分隔。可以使⽤--packages参数指定Maven坐标来添加依赖包,多个坐标使⽤逗号分隔。可以使⽤参数--repositories添加外部的repository。⽰例如下:
本地模式下,使⽤4个核运⾏Spark程序:
$ ./bin/spark-shell --master local[4]
将code.jar包添加到classpath:
$ ./bin/spark-shell --master local[4] --jars code.jar
使⽤Maven坐标添加⼀个依赖:
$ ./bin/spark-shell --master local[4] --packages "ample:example:0.1"
详细的Spark Shell参数描述请执⾏命令spark-shell --help。更多的spark-submit脚本请见。
4 弹性分布式数据集(RDDs)
Spark最重要的⼀个概念就是RDD,RDD是⼀个有容错机制的元素容器,它可以进⾏并⾏运算操作。得到RDD的⽅式有两个:通过并⾏化驱动程序中已有的⼀个集合⽽获得
通过外部存储系统(例如共享的⽂件系统、HDFS、HBase等)的数据集进⾏创建
4.1 并⾏集合(Parallelized Collections)
在驱动程序中,在⼀个已经存在的集合上(例如⼀个Scala的Seq)调⽤SparkContext的parallelize⽅法可以创建⼀个并⾏集合。集合⾥的元素将被复制到⼀个可被并⾏操作的分布式数据集中。下⾯为并⾏化⼀个保存数字1到5的集合⽰例:
Scala
val data = Array(1, 2, 3, 4, 5) val distData = sc.parallelize(data)
Java
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5); JavaRDD<Integer> distData = sc.parallelize(data);
当分布式数据集创建之后,就可以进⾏并⾏操作。例如,可以调⽤⽅法duce((a,b) => a + b)求数组内元素的和。Spark⽀持的分布式数据集上的操作将在后⾯章节中详细描述。
并⾏集合的⼀个重要的参数是表⽰将数据划分为⼏个分区(partition)的分区数。Spark将在集上每个数据分区上启动⼀个task。通常情况下,你可以在集上为每个CPU设置2-4个分区。⼀般情况下,Spark基于集⾃动设置分区数⽬。也可以⼿动进⾏设置,设置该参数需要将参数值作为第⼆参数传给parallelize⽅法,例如:sc.parallelize(data, 10)。注意:在代码中,部分位置使⽤术语slices(⽽不是partition),这么做的原因是为了保持版本的向后兼容性。
4.2 外部数据库(External Datasets)
Spark可以通过Hadoop⽀持的外部数据源创建分布式数据集,Hadoop⽀持的数据源有本地⽂件系统、HDFS、Cassandra、HBase、、Spark⽀持的⽂本⽂件、、Hadoop 。
SparkContext的testFile⽅法可以创建⽂本⽂件RDD。使⽤这个⽅法需要传递⽂本⽂件的URI,URI可以为本机⽂件路径、hdfs://、
s3n://等。该⽅法读取⽂本⽂件的每⼀⾏⾄容器中。⽰例如下:
Scala
scala> val distFile = sc.textFile("") distFile: RDD[String] = MappedRDD@1d4cee08
Java
JavaRDD<String> distFile = sc.textFile("");
创建之后,distFile就可以进⾏数据集的通⽤操作。例如,使⽤map和reduce操作计算所有⾏的长度的总和:distFile.map(s =>
s.length).reduce((a, b) => a + b)。
使⽤Spark读取⽂件需要注意⼀下⼏点:
程序中如果使⽤到本地⽂件路径,在其它worker节点上该⽂件必须在同⼀⽬录,并有访问权限。在这种情况下,可以将⽂件复制到所有的worker节点,也可以使⽤⽹络内的共享⽂件系统。
Spark所有的基于⽂件输⼊的⽅法(包括textFile),都⽀持⽂件夹、压缩⽂件、通配符。例
如:textFile("/my/directory")、textFile("/my/directory/*.txt")、textFile("/my/directory/*.gz")。
textFile⽅法提供了⼀个可选的第⼆参数,⽤于控制⽂件的分区数。默认情况下,Spark为⽂件的每个块创建⼀个分区(块使⽤HDFS 的默认值64MB),通过设置这个第⼆参数可以修改这个默认值。需要注意的是,分区数不能⼩于块数。
除了⽂本⽂件之外,Spark还⽀持其它的数据格式:
SparkContext.wholeTextFiles能够读取指定⽬录下的许多⼩⽂本⽂件,返回(filename,content)对。⽽textFile只能读取⼀个⽂本⽂件,返回该⽂本⽂件的每⼀⾏。
对于可以使⽤SparkContext的sequenceFile[K,V]⽅法,其中K是⽂件中key和value的类型。它们必须为像IntWritable和Text那样,是Hadoop的Writable接⼝的⼦类。另外,对于通⽤的Writable,Spark允许⽤户指定原⽣类型。例如,sequenceFile[Int,String]将⾃动读取IntWritable和Text。
对于其他Hadoop InputFormat,可以使⽤SparkContext.hadoopRDD⽅法,该⽅法接收任意类型的JobConf和输⼊格式类、键类型和值类型。可以像设置Hadoop job那样设置输⼊源。对于InputFormat还可以使⽤基于新版本MapReduce
API(org.apache.hadoop.mapreduce)的wAPIHadoopRDD。(⽼版本接⼝为:Spark
RDD.saveAsObjectFile和SparkContext.objectFile能够保存包含简单的序列化Java对象的RDD。但是这个⽅法不如Avro⾼效,Avro能够⽅便的保存任何RDD。
4.3 RDD操作(RDD Operations)
RDD⽀持两种类型的操作:
transformation:从⼀个RDD转换为⼀个新的RDD。
action:基于⼀个数据集进⾏运算,并返回RDD。
例如,map是⼀个transformation操作,map将数据集的每⼀个元素按指定的函数转换为⼀个RDD返回。reduce是⼀个action操
作,reduce将RDD的所有元素按指定的函数进⾏聚合并返回结果给驱动程序(还有⼀个并⾏的reduceByKey能够返回⼀个分布式的数据集)。
Spark的所有transformation操作都是懒执⾏,它们并不⽴马执⾏,⽽是先记录对数据集的⼀系列transf
ormation操作。在执⾏⼀个需要执⾏⼀个action操作时,会执⾏该数据集上所有的transformation操作,然后返回结果。这种设计让Spark的运算更加⾼效,例如,对⼀个数据集map操作之后使⽤reduce只返回结果,⽽不返回庞⼤的map运算的结果集。
默认情况下,每个转换的RDD在执⾏action操作时都会重新计算。即使两个action操作会使⽤同⼀个转换的RDD,该RDD也会重新计算。在这种情况下,可以使⽤persist⽅法或cache⽅法将RDD缓存到内存,这样在下次使⽤这个RDD时将会提⾼计算效率。在这⾥,也⽀持将RDD持久化到磁盘,或在多个节点上复制。
4.3.1 基础(Basics)
参考下⾯的程序,了解RDD的基本轮廓:
Scala
val lines = sc.textFile("")
val lineLengths = lines.map(s => s.length)
val totalLength = duce((a, b) => a + b)
Java
JavaRDD<String> lines = sc.textFile("");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = duce((a, b) -> a + b);
第⼀⾏通过读取⼀个⽂件创建了⼀个基本的RDD。这个数据集没有加载到内存,也没有进⾏其他的操作,变量lines仅仅是⼀个指向⽂件的指针。第⼆⾏为transformation操作map的结果。此时lineLengths也没有进⾏运算,因为map操作为懒执⾏。最后,执⾏action操作reduce。此时Spark将运算分隔成多个任务分发给多个机器,每个机器执⾏各⾃部分的map并进⾏本地reduce,最后返回运⾏结果给驱动程序。
如果在后⾯的运算中仍会⽤到lineLengths,可以将其缓存,在reduce操作之前添加如下代码,该persist操作将在lineLengths第⼀次被计算得到后将其缓存到内存:
Scala
lineLengths.persist()
Java
lineLengths.persist(StorageLevel.MEMORY_ONLY());
4.3.2 把函数传递到Spark(Passing Functions to Spark)
Scala
Spark的API,在很⼤程度上依赖于把驱动程序中的函数传递到集上运⾏。这有两种推荐的实现⽅式:
使⽤的语法,这可以让代码更加简洁。
使⽤全局单例对象的静态⽅法。⽐如,你可以定义函数对象object MyFunctions,然后将该对象的MyFunction.func1⽅法传递给Spark,如下所⽰:
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论