从spark.default.parallelism参数来看Spark并⾏度、并⾏计算
任务概念
1 并⾏度概念理解
并⾏度:并⾏度= partition= task总数。但是同⼀时刻能处理的task数量由并⾏计算任务决定(CPU cores决定)。
并⾏度(Parallelism)指的是分布式数据集被划分为多少份,从⽽⽤于分布式计算。换句话说,并⾏度的 出发点是数据(从数据的⾓度看),它明确了数据划分的粒度。并⾏度越⾼,数据的粒度越细,数据分⽚越 多,数据越分散。由此可见,像分区数量、分⽚数量、Partitions 这些概念都是并⾏度的 同义词。并⾏度本质上指的就是总分区数。也就是我们通常看到的spark.default.parallelism 和spark.sql.shuffle.partitions 这两个参数。这两个参数,spark.default.parallelism 是针对RDD设置的,spark.sql.shuffle.partitions 是针对数据框(df,ds)设置的。
并⾏计算任务(Paralleled Tasks)则不同,它指的是在任⼀时刻整个集能够同时计算的任务数量(从资源的视⾓,受计算机集资源制约)。换句话说,它的出发点是计算任务、是 CPU,由与 CPU 有关的三个参数共同决定。具体说来, Executor 中并⾏计算任务数的上限是 s 与 spark.t
ask.cpus 的商,暂 且记为 #Executor-tasks,整个集的并⾏计算任务数⾃然就是 #Executor-tasks 乘以集 内 Executors 的数量,记为 #Executors。因此,最终的数值是:#Executor-tasks *#Executors。
我们不难发现,并⾏度决定了数据粒度,数据粒度决定了分区⼤⼩,分区⼤⼩则决定着每 个计算任务的内存消耗。在同⼀个 Executor 中,多个同时运⾏的计算任务“基本上”是平 均⽠分可⽤内存的,每个计算任务能获取到的内存空间是有上限的,因此并⾏计算任务数 会反过来制约并⾏度的设置。
⼀般来说,集分区(task总数)数要⼤于Paralleled Tasks。
spark.default.parallelism:官⽅定义
Default number of partitions in RDDs returned by transformations like join, reduceByKey, and parallelize when not set by user.:如果⽤户没有指定参数,则返回像join,reducebykey,parallelize等RDD默认分区数量
默认情况:
For distributed shuffle operations like reduceByKey and join, the largest number of partitions in a parent RDD.:对于像reducebykey,join等分布式shuffle操作过程中,返回其⽗RDD最⼤分区数量。
For operations like parallelize with no parent RDDs, it depends on the cluster manager:对于像 没有⽗RDD的parallelize算⼦操作,其取决于集管理器。
spark.default.parallelism本质上设置的是每⼀阶段开始时的分区数,例如在mapstage阶段中类似于makeRDD,parallelize等创建RDD的时候,也就是获取数据源的时候,指定分区数,如果这些初始化算⼦中传⼊了分区数的参数,则优先使⽤传⼊的参数,没指定值,则读取spark.default.parallelism的值作为分区数,如果这两个值都未指定,则根据算⼦默认的计算⽅式进⾏计算,如读取hdfs默认以blk个数为分区,读取hbase则默认以region个数为分区。当有join,reducebykey等操作,此时便有了shuffle操作,会划分阶段,⼀份为⼆,此时map 阶段的数据需要落盘,落盘的数据相当于reduce阶段的数据源,reduce阶段分区数取决于:如果join,reducebykey参数中传⼊了分区值,则以该参数作为分区数,如果没有传⼊任何参数,设置了spark.default.parallelism则读取该值作为reduce阶段的分区数,如果这两个参数都没有设置则以join,reducebykey等算⼦的⽗RDD分区数作为reduce阶段的分区数。可以将join,reducebykey算⼦等理解为reduce阶段的开始。同样reduce阶段输出⽂件数量也是该参数决定的,这⼀点类似于mr的过程mr的分区数量也是由⽤户⾃⼰设置的reduce个数来决定的,spark本质上是mr的优化过程。对于没有⽗RDD的RDD,⽐如通过加载HDFS上的数据⽣成的RDD,它的分区数由InputFormat切分机制决定。⼀般情况下就是⼀个HDFS block块对应⼀个分区,对于不可切分⽂件则⼀个⽂件对应⼀个分区,与spark.default.parallelism值之间的关系具体细节还需继续看源码研究
对于SparkSql如果设置了spark.default.parallelism值,那么其分区是如何计算的呢?。
SparkSql读取hive表时数据分区规则。
不分桶的情况下:spark 通过FileSourceScanExec 来处理hdfs⽂件,源码如下:
/** 基础表table_a不为分桶表,读取数据的分区⽅式⾛此⽅法*/
private def createNonBucketedReadRDD(
readFile: (PartitionedFile) => Iterator[InternalRow],
selectedPartitions: Seq[PartitionDirectory],
fsRelation: HadoopFsRelation): RDD[InternalRow] = {
/**defaultMaxSplitBytes 即为spark.sql.files.maxPartitionBytes 参数,默认为128M*/
val defaultMaxSplitBytes =
fsRelation.f.filesMaxPartitionBytes
/
**openCostInBytes 即为spark.sql.files.openCostInBytes 参数,默认为4M*/
val openCostInBytes = fsRelation.f.filesOpenCostInBytes
/**defaultParallelism 并⾏度参数即 spark.default.parallelism */
val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism
val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
val bytesPerCore = totalBytes / defaultParallelism
/**分⽚⽅法的计算公式*/
/**openCostInBytes与bytesPerCore取最⼤,然后再与defaultMaxSplitBytes取最⼩*/
val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
s"open cost is considered as scanning $openCostInBytes bytes.")
/
**遍历⽂件*/
val splitFiles = selectedPartitions.flatMap { partition =>
partition.files.flatMap { file =>
val blockLocations = getBlockLocations(file)
/**判断⽂件是否⽀持分割,parquet可分割*/
if (fsRelation.fileFormat.isSplitable(
fsRelation.sparkSession, fsRelation.options, Path)) {
/**依据分⽚⼤⼩maxSplitBytes计算要多少分区来处理数据*/
(0L Len by maxSplitBytes).map { offset =>
val remaining = Len - offset
/**假如剩余量不⾜,那么该⽂件剩余的作为⼀个分区*/
val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
val hosts = getBlockHosts(blockLocations, offset, size)
PartitionedFile(
partition.values, String, offset, size, hosts)
}
} else {
/**判断⽂件是否⽀持分割,如果不能分割,⼀个⽂件⼀个partition*/
val hosts = getBlockHosts(blockLocations, 0, Len)
Seq(PartitionedFile(
partition.values, String, 0, Len, hosts))
}
}
}.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
.....
分区计算公式如下:
bytesPerCore = totalBytes / defaultParallelism
maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
defaultMaxSplitBytes: 官⽅解释:spark.sql.files.maxPartitionBytes, the default is 128M, the maximum amount of data read by each partition。即:每个分区读取的最⼤值,默认是128M
openCostInBytes: spark.sql.files.openCostInBytes, the default is 4M, files smaller than this size will be merged into one partition, which can be understood as the minimum amount of each partition, avoiding a lot of fragmentation tasks caused by broken files.
默认是4M,⽂件⼤⼩⼩于该值的时候将会合并到⼀个分区,该参数可以理解为每个分区的最⼩量,⽬的是避免由碎⽂件造成的⼤量碎⽚任务。
defaultParallelism: spark.default.parallelism, yarn defaults to the number of applied cores or 2. 等于
spark.default.parallelism,yarn默认为应⽤cores数量或2。
bytesPerCore: total data size / defaultParallelism。 数据总⼤⼩/设置的并⾏度值。
例如:读取数据⼤⼩为2048M
partitionSize(分区⼤⼩)的计算过程是⽐较简单的,实际上,会先对读⼊的每个分区按maxSplitBytes做切割,如果切割完后⼩⽂件⼤⼩⽐maxSplitBytes的⼤⼩还⼩就会被合并到⼀个partition,直到该⽂件的⼤⼩ > maxSplitBytes。
session如何设置和读取/ / If spark.default.parallelism is set to 1000, the final number of partitions is 512, each partition size is 4M
bytesPerCore=2048/1000=2.048M (按照设置的并⾏度,求得的每个分区的⼤⼩)
maxSplitBytes = Math.min(128M, Math.max(4M, 2.048M))=4M (根据规则实际算出每个分区⼤⼩)
partitionSize = 2048 / 4 = 512 (分区数)
/ / If spark.default.parallelism is set to 100, the final number of partitions is 100, each partition size is 20.48M
bytesPerCore=2048/100=20.48M (按照设置的并⾏度,求得的每个分区的⼤⼩)
maxSplitBytes = Math.min(128M, Math.max(4M, 20.48M))
partitionSize = 2048 / 20.48 = 100 (最终算出的分区数为100和实际设置的并⾏度⼀致)
/ / If spark.default.parallelism is set to 10, the final number of partitions is 16, each partition size is 128M
bytesPerCore=2048/10=204.8M (按照设置的并⾏度,求得的每个分区的⼤⼩)
maxSplitBytes = Math.min(128M, Math.max(4M, 204.8M))
partitionSize = 2048 / 128 = 16
通过上⾯分析可以看出,spark读取hive表等设置了spark.default.parallelism会有不⽣效的情况,那么如果想要增加分区,即task 数量,要怎么做呢?此时就要降低最终分⽚ maxSplitBytes的值,可以直接通过降低spark.sql.files.maxPartitionBytes 的值来降低 maxSplitBytes 的值。
分桶的情况下:
分区数量取决于分桶数
HiveTableScanExec
Partition by number of files and size.
Eg. 读取2048M数据, hdfs block size 设置为128M,并⾏度设置为1000
(1)如果牡蛎⼒有1000个⼩⽂件则有1000个分区数
(2)如果只有⼀个⽂件则会产⽣2048/16个分区
(3)如果⼀个较⼤⽂件为1024M,剩余999个⽂件总⼤⼩为1024M,则会产⽣1024M/128M=8+999=1007个分区
具体测试如下:
测试2:设置并⾏度为10。代码如下
object dep {
def main(args: Array[String]): Unit = {
// Spark 并⾏度测试。设置spark.default.parallelism为10
val sparkConf = new SparkConf().setMaster("local").setAppName("wordCount").set("spark.default.parallelism", "10") val sc = new SparkContext(sparkConf)
// TODO : new ParallelCollectionRDD.测试1不传⼊任何分区参数值
val rdd = sc.makeRDD(List(
"hello scala", "hello spark"
))
DebugString)
println("------------------")
// TODO : new MapPartitionsRDD -> new ParallelCollectionRDD
val wordRDD = rdd.flatMap(
string => {
string.split(" ")
}
)
DebugString)
println("------------------")
// TODO : new MapPartitionsRDD -> new MapPartitionsRDD
val mapRDD = wordRDD.map(
word => (word, 1)
)
DebugString)
println("------------------")
// TODO : new ShuffledRDD -> new MapPartitionsRDD
// reduceByKey参数中不传⼊分区值
val reduceRDD: RDD[(String, Int)] = duceByKey( _ + _ )
DebugString)
println("------------------")
llect().mkString(","))
sc.stop()
}
}
测试结果如下:
(10) ParallelCollectionRDD[0] at makeRDD at dep.scala:15 []
------------------
(10) MapPartitionsRDD[1] at flatMap at dep.scala:22 []
| ParallelCollectionRDD[0] at makeRDD at dep.scala:15 []
------------------
(10) MapPartitionsRDD[2] at map at dep.scala:31 []
| MapPartitionsRDD[1] at flatMap at dep.scala:22 []
| ParallelCollectionRDD[0] at makeRDD at dep.scala:15 []
------------------
(10) ShuffledRDD[3] at reduceByKey at dep.scala:39 []
+-(10) MapPartitionsRDD[2] at map at dep.scala:31 []
| MapPartitionsRDD[1] at flatMap at dep.scala:22 []
| ParallelCollectionRDD[0] at makeRDD at dep.scala:15 []
------------------
前⾯的10代表分区数,可以看到在创建RDD时候,读取了默认配置值,返回10个分区数,shuffle过程中shuffleRDD将所有的依赖关系全都展⽰了出来,依赖关系如下:
ShuffledRDD -> MapPartitionsRDD-> MapPartitionsRDD-> ParallelCollectionRDD。由于默认配置值为10因⽽ShuffledRDD分区数为10
测试3:设置并⾏度为10,makeRDD传⼊分区值为8。代码如下
object dep {
def main(args: Array[String]): Unit = {
// Spark 并⾏度测试。设置spark.default.parallelism为10
val sparkConf = new SparkConf().setMaster("local").setAppName("wordCount").set("spark.default.parallelism", "10")
val sc = new SparkContext(sparkConf)
// TODO : new ParallelCollectionRDD.测试1不传⼊任何分区参数值
val rdd = sc.makeRDD(List(
"hello scala", "hello spark"
),8)
DebugString)
println("------------------")
// TODO : new MapPartitionsRDD -> new ParallelCollectionRDD
val wordRDD = rdd.flatMap(
string => {
string.split(" ")
}
)
DebugString)
println("------------------")
// TODO : new MapPartitionsRDD -> new MapPartitionsRDD
val mapRDD = wordRDD.map(
word => (word, 1)
)
DebugString)
println("------------------")
/
/ TODO : new ShuffledRDD -> new MapPartitionsRDD
// reduceByKey参数中不传⼊分区值
val reduceRDD: RDD[(String, Int)] = duceByKey( _ + _ )
DebugString)
println("------------------")
llect().mkString(","))
sc.stop()
}
}
测试结果如下
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论