4--Master资源调度算法原理剖析与源码分析
Master是通过schedule⽅法进⾏资源调度,告知worker启动executor等。
⼀ schedule⽅法
/**
* 前⾯分析的都是怎样将资源,如worker、executor、Application等加⼊到各⾃的等待队列中(失败完成异常等等).
* 在等待的应⽤程序中调度当前可⽤的资源。
* 此⽅法将被调⽤-->每次⼀个新的应⽤程序连接或可⽤资源改变的时候。
*
* Master上⾯最重要的部分-->Master资源调度算法(其实就是在worker上⾯启动Executor)
* 1 判断master状态,只有alive状态的master才可以进⾏资源调度,standby是不能够调度的
* 2 将可⽤的worker节点打乱,这样有利于driver的负载均衡
* 3 进⾏driver资源调度,遍历处于等待状态的driver队列,发起driver
* 4 在worker上开启executor进程
*/
private def schedule(): Unit = {
// 判断Master的状态
// 只有alive状态的master才可以进⾏资源调度,standby是不能够调度的
if (state != RecoveryState.ALIVE) { return }
// 将可⽤的worker节点打乱,这样有利于driver的均衡
val shuffledWorkers = Random.shuffle(workers)
for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
// 进⾏driver资源调度,遍历处于等待状态的driver队列
for (driver <- waitingDrivers) {
// 判断worker的可使⽤内存是否⼤于driver所需要的内存以及worker可使⽤cpu核数是否⼤于driver所需要的cpu核数
if (Free >= && sFree >= s) {
// 满⾜条件发起driver
launchDriver(worker, driver)
// 将当前driver从等待队列中移除
waitingDrivers -= driver
}
}
}
// 在worker上开启executor进程
startExecutorsOnWorkers()
}
1.1 Random.shuffle是⼀个java的⽅法。
Random.shuffle的原理是:
对传⼊集合中的元素进⾏随机的打乱,取出workers中所有注册上来的worker,⾸先进⾏过滤,必须保证(作为参数)传⼊的worker的状态都是alive 的,然后, 对于状态为alive的worker,调⽤shuffle⽅法进⾏打乱。
将worker存⼊到ArrayBuffer中并赋值给buf.
swap函数: 是将索引位置上的Worker两两进⾏交换.
For循环:从buf中最后⼀个元素开始循环,⼀直到索引为3,其中的nextInt是取0到n-1的随机数,然后调⽤swp()函数,将n-1和k进⾏交换,这样执⾏结束后,buf中的Worker顺序完全被打乱了
def shuffle[T, CC[X] <: TraversableOnce[X]](xs: CC[T])(implicit bf: CanBuildFrom[CC[T], T, CC[T]]): CC[T] = {
val buf = new ArrayBuffer[T] ++= xs
def swap(i1: Int, i2: Int) {
val tmp = buf(i1)
buf(i1) = buf(i2)
buf(i2) = tmp
}
for (n <- buf.length to 2 by -1) {
val k = nextInt(n)
swap(n - 1, k)
}
(bf(xs) ++= buf).result()
}
1.2 launchDriver 发起driver
⾸先调度driver-->优先级⾼于Application 为什么先调度driver?
其实,只有提交⽅式为cluster的时候,才会注册driver,然后调度driver
standalone和client都是在本地启动driver,⽽不会来注册driver,更不⽤说调度driver了
/**
* 判断某个worker上⾯有driver所需的⾜够资源来启动相应的Executor
* @param worker
* @param driver
*/
private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
/
/ 提⽰信息
logInfo("Launching driver " + driver.id + " on worker " + worker.id)
// 将driver的信息加⼊到要为其启动Executor的workerInfo中
worker.addDriver(driver)
driver.worker = Some(worker)
// 向worker发送LaunchDriver消息
// 将driver的状态修改为RUNNING
driver.state = DriverState.RUNNING
}
⼆ startExecutorsOnWorkers  在worker上启动executor进程
/
**
* Schedule and launch executors on workers
* 在worker上开启executor进程
*/
private def startExecutorsOnWorkers(): Unit = {
// 遍历处于等待状态的application,且处于等待的状态的application的所需要的cpu核数⼤于0
// coresLeft = app请求的核数-已经分配给executor的核数的和
for (app <- waitingApps sLeft > 0) {
// 每⼀个executor所需要的核数
val coresPerExecutor: Option[Int] = sPerExecutor
// 过滤出有效的可⽤worker
/
/ 再从worker中过滤出worker剩余内存和CPU核数不⼩于app对应executor所需要的内存和CPU核数
// 按照剩余的CPU核数反向排序woker
val usableWorkers = Array.filter(_.state == WorkerState.ALIVE)
.filter(worker => Free >= PerExecutorMB &&
.sortBy(_.coresFree).reverse
// 在可⽤的worker上调度executor,启动executor有两种算法模式:
// ⼀:将应⽤程序尽可能多的分配到不同的worker上:spreadOutApps(平均分配)
// ⼆:和第⼀种相反,分配到尽可能少的worker上,通常⽤于计算密集型;⾮spreadOutApps(有点按需分配的意思)
// 每⼀个executor所需要的核数是可以配置的,⼀般来讲如果worker有⾜够的内存和CPU核数,同⼀个应⽤程序就可以
// 在该worker启动多个executors;否则就不能再启动新的executor了,则需要到其他worker上去分配executor了
val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
// 在可⽤的worker上分配资源给executor
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
allocateWorkerResourceToExecutors(
app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
}
}
}
三 scheduleExecutorsOnWorkers在每⼀个worker上调度资源
判断该worker能不能分配⼀个或者多个executor,能则分配相对应的executor所需要的CPU核数.
关于这两种资源调度的策略:spreadOutApps其实就是在可⽤的Worker列表中,逐个遍历,在每个worker上⾯平均的启动相应的Executor, ⽐如,现在需要启动10个Executor,但是第⼀次判断有5个可⽤(有空闲的资源)的worker节点,那么第⼀趟遍历的时候,会在每个Worker节点上⾯都启动⼀个Executor,然后会进⾏第⼆次判断,因为可能在第⼀次分配之后某个节点就没有空闲的资源了,所以需要再次的判断,然后进⾏第⼆次的遍历分配,这⾥我们假设在第⼆趟遍历的时候,这5个worker依然有⾜够的空闲资源,那么在第⼆趟的遍历之后,每个Worker节点上⾯都为这个app启动了2个Executor(平均分配); ⾮ spreadOutApps其实有两重的含义,在之前的版本中,这种⽅式含义是,在第⼀判断之后,在有可⽤的Worker节点上⾯选取空闲资源最多的呢个节点,尽可能的在这个节点上⾯启动app所需的所有Executor,如果资源不够的话,会进⾏在⼀次的判断,选取空闲资源最多的Worker继续启动(不是太合理).现在是按照assignedExecutors变量,可以指定在可⽤的Worker节点上⾯分别启动相应个数的Executor,(按需分配). ⾄于这两种资源分配的⽅式是由 if (spreadOutApps) { keepScheduling = false } 这个判断条件来判断的
private def scheduleExecutorsOnWorkers(
app: ApplicationInfo,
usableWorkers: Array[WorkerInfo],
/
/ 资源调度的两种策略:spreadOutApps和⾮spreadOutApps
spreadOutApps: Boolean): Array[Int] = {
// 如果我们指定executor需要分配的核数,coresPerExecutor表⽰executor所需要的cpu核数
免费平台源码资源网val coresPerExecutor = sPerExecutor
// app中每个executor所需要的最⼩cpu核数,如果没有默认最⼩核数为1
val minCoresPerExecutor = OrElse(1)
// 如果我们没有指定executor需要分配的核数,则⼀个worker上只能启动⼀个executor(平均分配)
val oneExecutorPerWorker = coresPerExecutor.isEmpty
// 每⼀个executor所需要的内存
val memoryPerExecutor = PerExecutorMB
// 获取可⽤worker数量
val numUsable = usableWorkers.length
// 构建⼀个可⽤worker长度的数组,⽤于存放每个worker节点可⽤于分配的cpu核数(16,16,16,16)
val assignedCores = new Array[Int](numUsable)
// 构建⼀个可⽤worker长度的数组,⽤于存放每⼀个worker上新分配的executor数量(1,2,1,0)
val assignedExecutors = new Array[Int](numUsable)
// 针对当前应⽤程序,还需要分配的cpu核数,它应该是application还需要的cpu核数和worker总共剩余核数之和
// 两者之间较⼩的那个
// 防⽌超过当前可⽤的cpu核数
var coresToAssign = math.sLeft, usableWorkers.map(_.coresFree).sum)
// 判断我们是否可以为这个application在指定的worker上发起⼀个executor
def canLaunchExecutor(pos: Int): Boolean = {
// 判断当前需要分配的cpu核数是否⼤于或者等于每个executor所需要的cpu核数,
// ⽐如总共只能分配8核,但是
// 每个executor所需要的cpu核数是12,那么就不能发起executor了,因为资源不够⽤
val keepScheduling = coresToAssign >= minCoresPerExecutor
// 当前worker剩余的核数 - 应⽤程序分配到该worker上的核数是否满⾜发起⼀个executor,
// ⽐如现在worker剩余核数16
// 然后⼜给application他分配了12核,即还剩4核可⽤,
// 但是启动⼀个executor需要12核,那么4 < 12 表⽰内核不⾜使⽤了
val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor
// 如果我们允许每⼀个worker启动多个executor,然后我们就可以启动⼀个新的executor
// 否则如果worker已经启动⼀个新executor,只需要将更多的内核分配给该executor即可
val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0
// 如果需要发起新的executor,既需要判断cpu核数是否⾜够,还需要判断 executor是否超过限制总数以及内存是否⾜够if (launchingNewExecutor) {
val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
val underLimit = assignedExecutors.sum + utors.size < utorLimit
keepScheduling && enoughCores && enoughMemory && underLimit
} else {
// 否则只是对已经存在的executor添加cpu核数,没必要检查内存和executor限制
keepScheduling && enoughCores
}
}
// 过滤出那些可⽤的worker节点
var freeWorkers = (0 until numUsable).filter(canLaunchExecutor)
while (Empty) {
// 遍历每⼀个空闲的worker
freeWorkers.foreach { pos =>
var keepScheduling = true
// 检测当前worker是否能够发起executor
while (keepScheduling && canLaunchExecutor(pos)) {
// 需要分配的核数减去每个executor所需要的最⼩核数
coresToAssign -= minCoresPerExecutor
// 对应的worker节点需要分配的cpu核数加上要启动该executor所需要的最⼩CPU核数
assignedCores(pos) += minCoresPerExecutor
// 如果每⼀个worker只允许启动⼀个executor,那么该worker启动的executor数量只能是1,否则应该加⼀个
if (oneExecutorPerWorker) {
assignedExecutors(pos) = 1
} else {
assignedExecutors(pos) += 1
}
// 如果需要将executor分配到更多的worker,那么就不再从当前worker节点继续分配,⽽是从下⼀个worker上继续分配
// 其实就是说如果是spreadOutApps⽅式的话,尽可能将Executor在更多的Worker上⾯启动-->平均分配
/
/ 如果spreadOutApps是false的话,不会将 keepScheduling置为false,其实就是按照提前设定的assignedExecutors变量,在每个worker上⾯启动相应个数的Executor
if (spreadOutApps) {
keepScheduling = false
}
}
}
// 因为进⾏了⼀次分配,需要再次从可⽤的worker节点中过滤可⽤的worker节点
freeWorkers = freeWorkers.filter(canLaunchExecutor)
}
assignedCores
}
四 allocateWorkerResourceToExecutors在worker上分配具体的资源
private def allocateWorkerResourceToExecutors(
app: ApplicationInfo,
assignedCores: Int,
coresPerExecutor: Option[Int],
worker: WorkerInfo): Unit = {
// 获取该worker应该有多少个executor
val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
// 获取每⼀个executor应该分配的核数,如果没有指定则使⽤计算的应该分配的核数
val coresToAssign = OrElse(assignedCores)
for (i <- 1 to numExecutors) {
/
/ 向worker上添加executor,创建ExecutorDesc对象,更新application已经分配到的cpu核数
val exec = app.addExecutor(worker, coresToAssign)
// 启动executor
launchExecutor(worker, exec)
// 更新application的状态
app.state = ApplicationState.RUNNING
}
}
五 launchExecutor发起executor
/**
* launchExecutor发起executor
* @param worker-->WorkerInfo
* @param exec-->ExecutorDesc
*/
private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
// worker启动executor,并且更新worker的cpu和内存信息
worker.addExecutor(exec)
exec.application.id, exec.id, exec.application.desc, s, ))
// 向application发送ExecutorAdded消息
exec.application.driver.send(
ExecutorAdded(exec.id, worker.id, worker.hostPort, s, ))
}

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。