⼀⽂读懂KubernetesScheduler扩展功能
前⾔
Scheduler是Kubernetes组件中功能&逻辑相对单⼀&简单的模块,它主要的作⽤是:watch kube-apiserver,监听
PodSpec.NodeName为空的pod,并利⽤预选和优选算法为该pod选择⼀个最佳的调度节点,最终将pod与该节点进⾏绑定,使pod调度在该节点上运⾏
展开上述调⽤流程中的scheduler部分,内部细节调⽤(参考)如图所⽰:
scheduler内部预置了很多预选和优选算法(参考),⽐如预选:
NoDiskConflict,PodFitsResources,MatchNodeSelector,CheckNodeMemoryPressure等;优选:
LeastRequestedPriority,BalancedResourceAllocation,CalculateAntiAffinityPriority,NodeAffinityPriority等。但是在实际⽣产环境中我们常常会需要⼀些特殊的调度策略,⽐如批量调度(aka ),这是kubernetes默认调度策略所⽆法满⾜的,这个时候就需要我们对scheduler进⾏扩展来实现这个功能了
scheduler扩展⽅案
⽬前Kubernetes⽀持四种⽅式实现客户⾃定义的调度算法(预选&优选),如下:
default-scheduler recoding: 直接在Kubernetes默认scheduler基础上进⾏添加,然后重新编译kube-scheduler
standalone: 实现⼀个与kube-scheduler平⾏的custom scheduler,单独或者和默认kube-scheduler⼀起运⾏在集中
scheduler extender: 实现⼀个"scheduler extender",kube-scheduler会调⽤它(http/https)作为默认调度算法(预选&优选&bind)的补充
scheduler framework: 实现scheduler framework plugins,重新编译kube-scheduler,类似于第⼀种⽅案,但是更加标准化,插件化
下⾯分别展开介绍这⼏种⽅式的原理和开发指引
default-scheduler recoding
这⾥我们先分析⼀下kube-scheduler调度相关⼊⼝:
设置默认预选&优选策略
见defaultPredicates以及defaultPriorities(k8s.io/kubernetes/pkg/scheduler/algorithmprovider/):
func init(){
registerAlgorithmProvider(defaultPredicates(),defaultPriorities())
}
func defaultPredicates() sets.String {
return sets.NewString(
predicates.NoVolumeZoneConflictPred,
predicates.MaxEBSVolumeCountPred,
predicates.MaxGCEPDVolumeCountPred,
predicates.MaxAzureDiskVolumeCountPred,
predicates.MaxCSIVolumeCountPred,
predicates.MatchInterPodAffinityPred,
predicates.NoDiskConflictPred,
predicates.GeneralPred,
predicates.PodToleratesNodeTaintsPred,
predicates.CheckVolumeBindingPred,
predicates.CheckNodeUnschedulablePred,
)
}
func defaultPriorities() sets.String {
return sets.NewString(
priorities.SelectorSpreadPriority,
priorities.InterPodAffinityPriority,
priorities.LeastRequestedPriority,
priorities.BalancedResourceAllocation,
priorities.NodePreferAvoidPodsPriority,
priorities.NodeAffinityPriority,
priorities.TaintTolerationPriority,
priorities.ImageLocalityPriority,
)
}
func registerAlgorithmProvider(predSet, priSet sets.String){
// Registers algorithm providers. By default we use 'DefaultProvider', but user can specify one to be used
// by specifying flag.
scheduler.RegisterAlgorithmProvider(scheduler.DefaultProvider, predSet, priSet)
// Cluster autoscaler friendly scheduling algorithm.
scheduler.RegisterAlgorithmProvider(ClusterAutoscalerProvider, predSet,
copyAndReplace(priSet, priorities.LeastRequestedPriority, priorities.MostRequestedPriority))
}
const(
// DefaultProvider defines the default algorithm provider name.
DefaultProvider ="DefaultProvider"
)
注册预选和优选相关处理函数
注册预选函数(k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults/):
func init(){
...
// Fit is determined by resource availability.
// This predicate is actually a default predicate, because it is invoked from
/
/ predicates.GeneralPredicates()
scheduler.RegisterFitPredicate(predicates.PodFitsResourcesPred, predicates.PodFitsResources)
}
注册优选函数(k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults/):
func init(){
...
// Prioritizes nodes that have labels matching NodeAffinity
scheduler.RegisterPriorityMapReduceFunction(priorities.NodeAffinityPriority, priorities.CalculateNodeAffinityPriorityMap, priorities.CalculateNodeAffinityPri orityReduce,1)
}
编写预选和优选处理函数
PodFitsResourcesPred对应的预选函数如下(k8s.io/kubernetes/pkg/scheduler/algorithm/):
// PodFitsResources checks if a node has sufficient resources, such as cpu, memory, gpu, opaque int resources etc to run a pod.
// First return value indicates whether a node has sufficient resources to run a pod while the second return value indicates the
// predicate failure reasons if the node has insufficient resources to run the pod.
func PodFitsResources(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo)(bool,[]PredicateFailureReason,error){
node := nodeInfo.Node()
if node ==nil{
return false,nil, fmt.Errorf("node not found")
}
var predicateFails []PredicateFailureReason
allowedPodNumber := nodeInfo.AllowedPodNumber()
if len(nodeInfo.Pods())+1> allowedPodNumber {
predicateFails =append(predicateFails,NewInsufficientResourceError(v1.ResourcePods,1,int64(len(nodeInfo.Pods())),int64(allowedPodNumber)))
}
// No extended resources should be ignored by default.
ignoredExtendedResources := sets.NewString()
var podRequest *schedulernodeinfo.Resource
if predicateMeta, ok := meta.(*predicateMetadata); ok && predicateMeta.podFitsResourcesMetadata !=nil{
podRequest = predicateMeta.podFitsResourcesMetadata.podRequest
if predicateMeta.podFitsResourcesMetadata.ignoredExtendedResources !=nil{
ignoredExtendedResources = predicateMeta.podFitsResourcesMetadata.ignoredExtendedResources
}
}else{
// We couldn't parse metadata - fallback to computing it.
podRequest =GetResourceRequest(pod)
}
if podRequest.MilliCPU ==0&&
podRequest.Memory ==0&&
podRequest.EphemeralStorage ==0&&
len(podRequest.ScalarResources)==0{
return len(predicateFails)==0, predicateFails,nil
}
allocatable := nodeInfo.AllocatableResource()
if allocatable.MilliCPU < podRequest.MilliCPU+nodeInfo.RequestedResource().MilliCPU {
predicateFails =append(predicateFails,NewInsufficientResourceError(v1.ResourceCPU, podRequest.MilliCPU, nodeInfo.RequestedResource().MilliCPU, allocatable.MilliCPU))
}
if allocatable.Memory < podRequest.Memory+nodeInfo.RequestedResource().Memory {
predicateFails =append(predicateFails,NewInsufficientResourceError(v1.ResourceMemory, podRequest.Memory, nodeInfo.RequestedResource().Mem ory, allocatable.Memory))
}
if allocatable.EphemeralStorage < podRequest.EphemeralStorage+nodeInfo.RequestedResource().Ep
hemeralStorage {
predicateFails =append(predicateFails,NewInsufficientResourceError(v1.ResourceEphemeralStorage, podRequest.EphemeralStorage, nodeInfo.Reque stedResource().EphemeralStorage, allocatable.EphemeralStorage))
}
for rName, rQuant :=range podRequest.ScalarResources {
if v1helper.IsExtendedResourceName(rName){
// If this resource is one of the extended resources that should be
// ignored, we will skip checking it.
if ignoredExtendedResources.Has(string(rName)){
if ignoredExtendedResources.Has(string(rName)){
continue
}nodeselector
}
if allocatable.ScalarResources[rName]< rQuant+nodeInfo.RequestedResource().ScalarResources[rName]{
predicateFails =append(predicateFails,NewInsufficientResourceError(rName, podRequest.ScalarResources[rName], nodeInfo.RequestedResource().Sc alarResources[rName], allocatable.ScalarResources[rName]))
}
}
if klog.V(10){
if len(predicateFails)==0{
// We explicitly don't do klog.V(10).Infof() to avoid computing all the parameters if this is
/
/ not logged. There is visible performance gain from it.
klog.Infof("Schedule Pod %+v on Node %+v is allowed, Node is running only %v out of %v Pods.",
podName(pod), node.Name,len(nodeInfo.Pods()), allowedPodNumber)
}
}
return len(predicateFails)==0, predicateFails,nil
}
优选NodeAffinityPriority对应的Map与Reduce函数(k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/)如下:

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。