Spark官⽅⽂档-SparkCore
快速开始
RDD
1.Spark 的主要抽象是⼀个分布式弹性数据集(RDD),可以从 Hadoop InputFormats(例如 HDFS ⽂件)或通过转换其他RDD来创建RDD。
val textFile = File("README.md")
2.可以通过调⽤某些操作直接从 RDD中获取值,或者转换RDD以获得新的值
textFile.first() // First item in this Dataset
3.将此数据集转换为新的数据集。我们调⽤filter返回⼀个包含⽂件中RDD⼦集的新数据集。
val linesWithSpark = textFile.filter(line => ains("Spark"))
更多关于数据集操作
1.到单词最多的那⼀⾏:
先计算每⾏单词数;reduce在该数据集上调⽤以查最⼤字数;
textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
RDD
初始化 Spark
Spark 程序必须做的第⼀件事是创建⼀个对象,它告诉 Spark 如何访问集。要创建⼀个,SparkContext您⾸先需要构建⼀个对象,该对象包含有关您的应⽤程序的信息。每个 JVM 应该只有⼀个 SparkContext 处于活动状态。stop()在创建新的之前,您必须激活SparkContext。
val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)
该appName参数是您的应⽤程序在集 UI 上显⽰的名称。 master是⼀个,或者⼀个特殊的“本地”字
符串以在本地模式下运⾏。 Using the Shell
$ ./bin/spark-shell --master local[4] --jars code.jar
弹性分布式数据集 (RDD)
Spark 围绕弹性分布式数据集(RDD)的概念展开,RDD 是可以并⾏操作的元素的容错集合。
有两种⽅法可以创建 RDD: 在驱动程序中并⾏化现有集合,或引⽤外部存储系统中的数据集,例如共享⽂件系统、HDFS、HBase 或任何提供 Hadoop InputFormat 的数据源。
并⾏集合
并⾏化集合是通过在驱动程序(Scala )中的现有集合上调⽤SparkContext的parallelize⽅法来创建的Seq。集合的元素被复制以形成可以并⾏操作的分布式数据集。例如,这⾥是如何创建⼀个包含数字 1 到 5 的并⾏化集合:
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
创建后,distData可以并⾏操作分布式数据集 ( )。例如,我们可能会调⽤duce((a, b) => a + b)将数组的元素相加。稍后我们将描述分布式数据集上的操作。
并⾏集合的⼀个重要参数是将数据集切割成的分区数。Spark 将为集的每个分区运⾏⼀个任务。通常,集中的每个 CPU 需要 2-4 个分区。通常,Spark 会尝试根据您的集⾃动设置分区数。但是,您也可以通过将其作为第⼆个参数传递给parallelize(例
如sc.parallelize(data, 10))来⼿动设置它。注意:代码中的某些地⽅使⽤术语切⽚(分区的同义词)来保持向后兼容性。
外部数据集
Spark 可以从 Hadoop ⽀持的任何存储源创建分布式数据集,包括本地⽂件系统、HDFS、Cassandra、HBase、等。Spark ⽀持⽂本⽂件、和任何其他 Hadoop 。
可以使⽤SparkContext的textFile⽅法创建⽂本⽂件 RDD 。此⽅法需要⼀个URI的⽂件(本地路径的机器上,或⼀个hdfs://,s3a://等URI),并读取其作为⾏的集合。这是⼀个⽰例调⽤:
val distFile = sc.textFile("")
使⽤ Spark 读取⽂件的⼀些注意事项:
如果使⽤本地⽂件系统上的路径,则该⽂件也必须可以在所有⼯作节点上的相同路径上访问。要么将⽂件复制到所有⼯作节点,要么使⽤⽹络安装的共享⽂件系统。
Spark 的所有基于⽂件的输⼊⽅法,包括textFile,都⽀持在⽬录、压缩⽂件和通配符上运⾏。例如,你可以使
⽤textFile("/my/directory"),textFile("/my/directory/*.txt")和textFile("/my/directory/*.gz")。当读取多个⽂件时,分区的顺序取决于⽂件从⽂件系统返回的顺序。例如,它可能会也可能不会遵循⽂件的字典顺序(按路径)。在分区内,元素根据它们在底层⽂件中的顺序进⾏排序。
该textFile⽅法还采⽤可选的第⼆个参数来控制⽂件的分区数。默认情况下,Spark 为⽂件的每个块创建⼀个分区(在 HDFS 中默认块为 128MB),但您也可以通过传递更⼤的值来请求更多的分区。请注意,分区数不能少于块数。
除了⽂本⽂件,Spark 的 Scala API 还⽀持其他⼏种数据格式:
SparkContext.wholeTextFiles允许您读取包含多个⼩⽂本⽂件的⽬录,并将每个⽂件作为(⽂件名、内容)的⼆元组对返回。
与textFile对⽐,后者将在每个⽂件中的每⾏的⼀条记录返回。分区由数据本地性决定,在某些情况下,
这可能导致分区太少(本地数据较少)。对于这些情况,wholeTextFiles提供可选的第⼆个参数来控制最⼩分区数。
对于(K,V存储格式的⽂件),使⽤ SparkContext 的sequenceFile[K, V]⽅法,其中K和V是⽂件中键和值的类型。这些应该是Hadoop 的接⼝的⼦类,如和。此外,Spark 允许您为⼀些常见的 Writable 指定基础类型;例如,sequenceFile[Int, String]会⾃动读取 IntWritables 和 Texts。
对于其他 Hadoop InputFormats,您可以使⽤该SparkContext.hadoopRDD⽅法,该⽅法采⽤任意JobConf输⼊格式类、键类和值类。以与使⽤输⼊源的 Hadoop 作业相同的⽅式设置这些。您还可以使⽤wAPIHadoopRDD基
于“新”MapReduce API ( org.apache.hadoop.mapreduce) 的InputFormats 。
RDD.saveAsObjectFile和SparkContext.objectFile⽀持,⽤序列化的 Java 对象,这种简单格式来组成RDD,并⽤来保存RDD。虽然这不如 Avro 之类的专⽤格式有效,但它提供了⼀种保存任何 RDD 的简单⽅法。
RDD 操作
RDD ⽀持两种类型的操作:
transforms(转换)它从现有的数据集创建⼀个新的数据集,
actions(⾏动),它在对数据集运⾏计算后返回⼀个值给Driver程序。例如,map是⼀个转换,它通过⼀个函数传递每个数据集元素并返回⼀个表⽰结果的新RDD。另⼀⽅⾯,reduce是使⽤某个函数聚合RDD的所有元素并将最终结果返回给Driver程序的操作(也可以⽤⼀个并⾏reduceByKey返回分布式数据集)。
Spark 中的所有转换都是惰性的,因为它们不会⽴即计算结果。相反,他们只记住应⽤于某些基本数据集(例如⽂件)的转换。仅当操作需要将结果返回到Driver程序时才计算转换(即执⾏action算⼦时)。这种设计使 Spark 能够更⾼效地运⾏。例如,我们可以意识到通过创建的数据集map将在 a 中使⽤,reduce并且仅将 的结果返回reduce给驱动程序,⽽不是更⼤的映射数据集。
默认情况下,每次在其上运⾏操作时,每个转换后的 RDD 可能会重新计算。但是,你也可以持久化RDD;使⽤内存中
的RDD persist(或cache)⽅法,在这种情况下,Spark将保存RDD在周围的集上,⽅便你下⼀次查询使⽤它。还⽀持在磁盘上持久化RDD,或跨多个节点复制。
将函数传递给 Spark
Spark 的 API 严重依赖于Driver程序中传递的函数来在集上运⾏。有两种推荐的⽅法可以做到这⼀点:
,可⽤于短代码。
全局单例对象中的静态⽅法。例如,您可以定义object MyFunctions然后传递MyFunctions.func1,如下所⽰:
object MyFunctions {
def func1(s: String): String = { ... }
}
myRdd.map(MyFunctions.func1)
请注意,虽然也可以在类实例中传递对⽅法的引⽤(与单例对象相反),但这需要将包含该类的对象与⽅法⼀起发送。例如,考虑:
class MyClass {
scala不是内部或外部命令
def func1(s: String): String = { ... }
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}
在这⾥,若我们创建⼀个新MyClass实例并调⽤doStuff它,map那⾥的内部引⽤了该实例的 func1⽅法,因此需要将整个对象发送到集。它类似于写成 MyClass rdd.map(x => this.func1(x))
以类似的⽅式,访问外部对象的字段将引⽤整个对象:
class MyClass {
val field = "Hello"
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}
相当于写成 rdd.map(x => this.field + x),它引⽤了所有的this. 为了避免这个问题,最简单的⽅法是复制field到⼀个局部变量中,⽽不是从外部访问它:
def doStuff(rdd: RDD[String]): RDD[String] = {
val field_ = this.field
rdd.map(x => field_ + x)
}
了解闭包
Spark 的难点之⼀是在跨集执⾏代码时了解变量和⽅法的范围和⽣命周期。修改其范围之外的变量的 RDD 操作可能是⼀个常见的混淆源。在下⾯的⽰例中,我们将查看foreach()⽤于递增计数器的代码,但其他操作也会出现类似问题。
例⼦
考虑下⾯的朴素 RDD 元素 sum,根据执⾏是否在同⼀ JVM 中发⽣,其⾏为可能会有所不同。⼀个常见的例⼦是在local模式 ( --master = local[n])下运⾏ Spark与将 Spark 应⽤程序部署到集时(例如,通过 spark-submit 到 YARN):
var counter = 0
var rdd = sc.parallelize(data)
// Wrong: Don't do this!!
rdd.foreach(x => counter += x)
println("Counter value: " + counter)
本地与集模式
为了执⾏作业,Spark 将 RDD 操作的处理分解为task任务,每个任务由⼀个执⾏器执⾏。在执⾏之前,Spark 计算任务的闭包。闭包
是那些必须对执⾏器可见的变量和⽅法,以便在 RDD 上执⾏其计算(在本例中foreach())。这个闭包被序列化并发送给每个executor。
发送到每个executor的闭包中的变量为现在变量的副本,因此,当在函数中引⽤计数器时foreach,它不再是driver节点上的计数器。driver节点的内存中仍然有⼀个计数器,但这对executor不再可见!executor只能看到序列化闭包的副本。因此,counter的最终值仍然为零,因为counter上的所有操作都引⽤了序列化闭包中的值。
为了解决这些问题:Spark引⼊了累加器;
RDD 的打印元素
⼀个常见的习惯⽤法是使⽤rdd.foreach(println)或打印出 RDD 的元素rdd.map(println)。在⼀台机器上,这将⽣成预期的输出并打印所有RDD 的元素。但是,在cluster模式下,stdout执⾏程序调⽤的输出现在写⼊executor端的stdout程序,⽽不是driver上的输出,因
此stdout不会在driver端显⽰这些!要在Driver端打印所有元素(包括Executor端),可以使⽤的collect()⽅法,⾸先把RDD拉取到Driver 节点:llect().foreach(println)。但是,这可能会导致Driver内存不⾜,因为collect()将整个 RDD 提取到⼀台机器上;如果您只需要打印 RDD 的⼏个元素,更安全的⽅法是使⽤take(): rdd.take(100).foreach(println)。
使⽤键值对
虽然⼤多数 Spark 操作适⽤于包含任何类型对象的 RDD,但⼀些特殊操作仅适⽤于键值对的 RDD。最常见的是分布式“shuffle”操作,例如通过键对元素进⾏分组(grouping)或聚合(aggregating )。
在 Scala 中,这些操作在包含对象(语⾔中的内置元组,通过简单地编写创建(a, b))的上⾃动可⽤ 的。键值对操作在类中可⽤ ,它⾃动包装元组的 RDD。
例如,以下代码使⽤reduceByKey对键值对的操作来计算⽂件中每⾏⽂本出现的次数:
val lines = sc.textFile("")
val pairs = lines.map(s => (s, 1))
val counts = duceByKey((a, b) => a + b)
例如counts.sortByKey(),我们还可以使⽤来按字母顺序对这些对进⾏排序,最后 llect()将它们作为对象数组带回driver端。
注意:在键值对操作中使⽤⾃定义对象作为key时,必须确保⾃定义equals()⽅法附带匹配hashCode()⽅法。
Transformations转换操作
Transformation Meaning
map(func)Return a new distributed dataset formed by passing each element of the source through a function func.返回新的RDD,将fun函数都作⽤到原的RDD每个元素上
filter(func)Return a new dataset formed by selecting those elements of the source on which func returns true.
返回新的RDD,将fun函数作⽤到原RDD的每个元素上,其fun返回是true
flatMap(func)Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).类似map,但是把RDD中的⼀个或多个元素,拍平输出成⼀个集合中
mapPartitions(func)Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.
类似于map操作,但作⽤在每个分区上;在T类型的RDD上运⾏,func必须是 Iterator<T> => Iterator<U> 类型。
mapPartitionsWithIndex(func)Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T.
与mapPartition类似,但增加了⼀个索引,在 T 类型的 RDD 上运⾏
时,func必须是 (Int, Iterator<T>) => Iterator<U> 类型。
sample(withReplacement, fraction, seed)Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.
union(otherDataset)Return a new dataset that contains the union of the elements in the source dataset and the argument.
返回⼀个新数据集,其中包含两个数据集的并集。
intersection(otherDataset)Return a new RDD that contains the intersection of elements in the source dataset and the argument.
返回⼀个新数据集,其中包含两个数据集的交集。
distinct([numPartitions]))Return a new dataset that contains the distinct elements of the source dataset.
单类型RDD操作,返回⼀个新的RDD包含两个数据集的差集
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论