// New returns a Cache implementation.
// It automatically starts a go routine that manages expiration of assumed pods.
// "ttl" is how long the assumed pod will get expired.
// "stop" is the channel that would close the background goroutine.
funcNew(ttltime.Duration,stop<-chanstruct{})Cache{cache:=newSchedulerCache(ttl,cleanAssumedPeriod,stop)cache.run()returncache}
// cleanupAssumedPods exists for making test deterministic by taking time as input argument.
// It also reports metrics on the cache size for nodes, pods, and assumed pods.
func(cache*schedulerCache)cleanupAssumedPods(nowtime.Time){cache.mu.Lock()defercache.mu.Unlock()defercache.updateMetrics()// The size of assumedPods should be small
//这里遍历所有的assumedPods,
forkey:=rangecache.assumedPods{ps,ok:=cache.podStates[key]if!ok{klog.Fatal("Key found in assumed set but not in podStates. Potentially a logical error.")}//如果已经完成绑定则跳过
if!ps.bindingFinished{klog.V(3).Infof("Couldn't expire cache for pod %v/%v. Binding is still in progress.",ps.pod.Namespace,ps.pod.Name)continue}//超过时间未绑定的则会进行清理
ifnow.After(*ps.deadline){klog.Warningf("Pod %s/%s expired",ps.pod.Namespace,ps.pod.Name)iferr:=cache.expirePod(key,ps);err!=nil{klog.Errorf("ExpirePod failed for %s: %v",key,err)}}}}
varsched*Schedulersource:=options.schedulerAlgorithmSourceswitch{casesource.Provider!=nil:// Create the config from a named algorithm provider.
sc,err:=configurator.createFromProvider(*source.Provider)iferr!=nil{returnnil,fmt.Errorf("couldn't create scheduler using provider %q: %v",*source.Provider,err)}sched=sccasesource.Policy!=nil:// Create the config from a user specified policy source.
policy:=&schedulerapi.Policy{}switch{casesource.Policy.File!=nil:iferr:=initPolicyFromFile(source.Policy.File.Path,policy);err!=nil{returnnil,err}casesource.Policy.ConfigMap!=nil:iferr:=initPolicyFromConfigMap(client,source.Policy.ConfigMap,policy);err!=nil{returnnil,err}}// Set extenders on the configurator now that we've decoded the policy
// In this case, c.extenders should be nil since we're using a policy (and therefore not componentconfig,
// which would have set extenders in the above instantiation of Configurator from CC options)
configurator.extenders=policy.Extenderssc,err:=configurator.createFromConfig(*policy)iferr!=nil{returnnil,fmt.Errorf("couldn't create scheduler from policy: %v",err)}sched=scdefault:returnnil,fmt.Errorf("unsupported algorithm source: %v",source)}
// createFromProvider creates a scheduler from the name of a registered algorithm provider.
func(c*Configurator)createFromProvider(providerNamestring)(*Scheduler,error){klog.V(2).Infof("Creating scheduler from algorithm provider '%v'",providerName)r:=algorithmprovider.NewRegistry()defaultPlugins,exist:=r[providerName]if!exist{returnnil,fmt.Errorf("algorithm provider %q is not registered",providerName)}fori:=rangec.profiles{prof:=&c.profiles[i]plugins:=&schedulerapi.Plugins{}plugins.Append(defaultPlugins)plugins.Apply(prof.Plugins)prof.Plugins=plugins}returnc.create()}
// create a scheduler from a set of registered plugins.
func(c*Configurator)create()(*Scheduler,error){varextenders[]core.SchedulerExtendervarignoredExtendedResources[]stringiflen(c.extenders)!=0{varignorableExtenders[]core.SchedulerExtenderforii:=rangec.extenders{klog.V(2).Infof("Creating extender with config %+v",c.extenders[ii])extender,err:=core.NewHTTPExtender(&c.extenders[ii])iferr!=nil{returnnil,err}if!extender.IsIgnorable(){extenders=append(extenders,extender)}else{ignorableExtenders=append(ignorableExtenders,extender)}for_,r:=rangec.extenders[ii].ManagedResources{ifr.IgnoredByScheduler{ignoredExtendedResources=append(ignoredExtendedResources,r.Name)}}}// place ignorable extenders to the tail of extenders
extenders=append(extenders,ignorableExtenders...)}// If there are any extended resources found from the Extenders, append them to the pluginConfig for each profile.
// This should only have an effect on ComponentConfig v1alpha2, where it is possible to configure Extenders and
// plugin args (and in which case the extender ignored resources take precedence).
// For earlier versions, using both policy and custom plugin config is disallowed, so this should be the only
// plugin config for this plugin.
iflen(ignoredExtendedResources)>0{fori:=rangec.profiles{prof:=&c.profiles[i]prof.PluginConfig=append(prof.PluginConfig,frameworkplugins.NewPluginConfig(noderesources.FitName,noderesources.FitArgs{IgnoredResources:ignoredExtendedResources},),)}}profiles,err:=profile.NewMap(c.profiles,c.buildFramework,c.recorderFactory)iferr!=nil{returnnil,fmt.Errorf("initializing profiles: %v",err)}iflen(profiles)==0{returnnil,errors.New("at least one profile is required")}// Profiles are required to have equivalent queue sort plugins.
lessFn:=profiles[c.profiles[0].SchedulerName].Framework.QueueSortFunc()podQueue:=internalqueue.NewSchedulingQueue(lessFn,internalqueue.WithPodInitialBackoffDuration(time.Duration(c.podInitialBackoffSeconds)*time.Second),internalqueue.WithPodMaxBackoffDuration(time.Duration(c.podMaxBackoffSeconds)*time.Second),)// Setup cache debugger.
debugger:=cachedebugger.New(c.informerFactory.Core().V1().Nodes().Lister(),c.podInformer.Lister(),c.schedulerCache,podQueue,)debugger.ListenForSignal(c.StopEverything)algo:=core.NewGenericScheduler(c.schedulerCache,podQueue,c.nodeInfoSnapshot,extenders,c.informerFactory.Core().V1().PersistentVolumeClaims().Lister(),GetPodDisruptionBudgetLister(c.informerFactory),c.disablePreemption,c.percentageOfNodesToScore,c.enableNonPreempting,)return&Scheduler{SchedulerCache:c.schedulerCache,Algorithm:algo,Profiles:profiles,NextPod:internalqueue.MakeNextPodFunc(podQueue),Error:MakeDefaultErrorFunc(c.client,podQueue,c.schedulerCache),StopEverything:c.StopEverything,VolumeBinder:c.volumeBinder,SchedulingQueue:podQueue,},nil}
typePriorityQueuestruct{stopchanstruct{}clockutil.Clock// pod initial backoff duration.默认1s
podInitialBackoffDurationtime.Duration// pod maximum backoff duration.默认10s
podMaxBackoffDurationtime.Durationlocksync.RWMutexcondsync.Cond// activeQ is heap structure that scheduler actively looks at to find pods to
// schedule. Head of heap is the highest priority pod.
activeQ*heap.Heap// podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff
// are popped from this heap before the scheduler looks at activeQ
podBackoffQ*heap.Heap// unschedulableQ holds pods that have been tried and determined unschedulable.
unschedulableQ*UnschedulablePodsMap// nominatedPods is a structures that stores pods which are nominated to run
// on nodes.
nominatedPods*nominatedPodMap// schedulingCycle represents sequence number of scheduling cycle and is incremented
// when a pod is popped.
schedulingCycleint64// moveRequestCycle caches the sequence number of scheduling cycle when we
// received a move request. Unscheduable pods in and before this scheduling
// cycle will be put back to activeQueue if we were trying to schedule them
// when we received move request.
moveRequestCycleint64// closed indicates that the queue is closed.
// It is mainly used to let Pop() exit its control loop while waiting for an item.
closedbool}
typeHeapstruct{// data stores objects and has a queue that keeps their ordering according
// to the heap invariant.
data*data// metricRecorder updates the counter when elements of a heap get added or
// removed, and it does nothing if it's nil
metricRecordermetrics.MetricRecorder}typedatastruct{// items is a map from key of the objects to the objects and their index.
// We depend on the property that items in the map are in the queue and vice versa.
itemsmap[string]*heapItem// queue implements a heap data structure and keeps the order of elements
// according to the heap invariant. The queue keeps the keys of objects stored
// in "items".
queue[]string// keyFunc is used to make the key used for queued item insertion and retrieval, and
// should be deterministic.
keyFuncKeyFunc// lessFunc is used to compare two objects in the heap.
lessFunclessFunc}
// Less is the function used by the activeQ heap algorithm to sort pods.
// It sorts pods based on their priority. When priorities are equal, it uses
// PodInfo.timestamp.
func(pl*PrioritySort)Less(pInfo1,pInfo2*framework.PodInfo)bool{p1:=pod.GetPodPriority(pInfo1.Pod)p2:=pod.GetPodPriority(pInfo2.Pod)return(p1>p2)||(p1==p2&&pInfo1.Timestamp.Before(pInfo2.Timestamp))}
func(p*PriorityQueue)Add(pod*v1.Pod)error{p.lock.Lock()deferp.lock.Unlock()pInfo:=p.newPodInfo(pod)iferr:=p.activeQ.Add(pInfo);err!=nil{klog.Errorf("Error adding pod %v to the scheduling queue: %v",nsNameForPod(pod),err)returnerr}ifp.unschedulableQ.get(pod)!=nil{klog.Errorf("Error: pod %v is already in the unschedulable queue.",nsNameForPod(pod))p.unschedulableQ.delete(pod)}// Delete pod from backoffQ if it is backing off
iferr:=p.podBackoffQ.Delete(pInfo);err==nil{klog.Errorf("Error: pod %v is already in the podBackoff queue.",nsNameForPod(pod))}metrics.SchedulerQueueIncomingPods.WithLabelValues("active",PodAdd).Inc()p.nominatedPods.add(pod,"")p.cond.Broadcast()returnnil}func(p*PriorityQueue)Pop()(*framework.PodInfo,error){p.lock.Lock()deferp.lock.Unlock()forp.activeQ.Len()==0{// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the p.closed is set and the condition is broadcast,
// which causes this loop to continue and return from the Pop().
ifp.closed{returnnil,fmt.Errorf(queueClosed)}p.cond.Wait()}obj,err:=p.activeQ.Pop()iferr!=nil{returnnil,err}pInfo:=obj.(*framework.PodInfo)pInfo.Attempts++p.schedulingCycle++returnpInfo,err}
func(p*PriorityQueue)flushBackoffQCompleted(){p.lock.Lock()deferp.lock.Unlock()for{rawPodInfo:=p.podBackoffQ.Peek()ifrawPodInfo==nil{return}pod:=rawPodInfo.(*framework.PodInfo).PodboTime:=p.getBackoffTime(rawPodInfo.(*framework.PodInfo))ifboTime.After(p.clock.Now()){return}_,err:=p.podBackoffQ.Pop()iferr!=nil{klog.Errorf("Unable to pop pod %v from backoff queue despite backoff completion.",nsNameForPod(pod))return}p.activeQ.Add(rawPodInfo)metrics.SchedulerQueueIncomingPods.WithLabelValues("active",BackoffComplete).Inc()deferp.cond.Broadcast()}}
func(p*PriorityQueue)movePodsToActiveOrBackoffQueue(podInfoList[]*framework.PodInfo,eventstring){for_,pInfo:=rangepodInfoList{pod:=pInfo.Podifp.isPodBackingoff(pInfo){//如果还在backoff阶段,则会将pod加入到backoff队列。。
iferr:=p.podBackoffQ.Add(pInfo);err!=nil{klog.Errorf("Error adding pod %v to the backoff queue: %v",pod.Name,err)}else{metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff",event).Inc()p.unschedulableQ.delete(pod)}}else{iferr:=p.activeQ.Add(pInfo);err!=nil{klog.Errorf("Error adding pod %v to the scheduling queue: %v",pod.Name,err)}else{metrics.SchedulerQueueIncomingPods.WithLabelValues("active",event).Inc()p.unschedulableQ.delete(pod)}}}//更新PriorityQueue中的moveRequestCycle
p.moveRequestCycle=p.schedulingCyclep.cond.Broadcast()}
func(g*genericScheduler)Schedule(ctxcontext.Context,prof*profile.Profile,state*framework.CycleState,pod*v1.Pod)(resultScheduleResult,errerror){trace:=utiltrace.New("Scheduling",utiltrace.Field{Key:"namespace",Value:pod.Namespace},utiltrace.Field{Key:"name",Value:pod.Name})defertrace.LogIfLong(100*time.Millisecond)iferr:=podPassesBasicChecks(pod,g.pvcLister);err!=nil{returnresult,err}trace.Step("Basic checks done")iferr:=g.snapshot();err!=nil{returnresult,err}trace.Step("Snapshotting scheduler cache and node infos done")ifg.nodeInfoSnapshot.NumNodes()==0{returnresult,ErrNoNodesAvailable}// Run "prefilter" plugins.
preFilterStatus:=prof.RunPreFilterPlugins(ctx,state,pod)if!preFilterStatus.IsSuccess(){returnresult,preFilterStatus.AsError()}trace.Step("Running prefilter plugins done")startPredicateEvalTime:=time.Now()filteredNodes,filteredNodesStatuses,err:=g.findNodesThatFitPod(ctx,prof,state,pod)iferr!=nil{returnresult,err}trace.Step("Computing predicates done")iflen(filteredNodes)==0{returnresult,&FitError{Pod:pod,NumAllNodes:g.nodeInfoSnapshot.NumNodes(),FilteredNodesStatuses:filteredNodesStatuses,}}// Run "prescore" plugins.
prescoreStatus:=prof.RunPreScorePlugins(ctx,state,pod,filteredNodes)if!prescoreStatus.IsSuccess(){returnresult,prescoreStatus.AsError()}trace.Step("Running prescore plugins done")metrics.DeprecatedSchedulingAlgorithmPredicateEvaluationSecondsDuration.Observe(metrics.SinceInSeconds(startPredicateEvalTime))metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.PredicateEvaluation).Observe(metrics.SinceInSeconds(startPredicateEvalTime))startPriorityEvalTime:=time.Now()// When only one node after predicate, just use it.
iflen(filteredNodes)==1{metrics.DeprecatedSchedulingAlgorithmPriorityEvaluationSecondsDuration.Observe(metrics.SinceInSeconds(startPriorityEvalTime))returnScheduleResult{SuggestedHost:filteredNodes[0].Name,EvaluatedNodes:1+len(filteredNodesStatuses),FeasibleNodes:1,},nil}priorityList,err:=g.prioritizeNodes(ctx,prof,state,pod,filteredNodes)iferr!=nil{returnresult,err}metrics.DeprecatedSchedulingAlgorithmPriorityEvaluationSecondsDuration.Observe(metrics.SinceInSeconds(startPriorityEvalTime))metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.PriorityEvaluation).Observe(metrics.SinceInSeconds(startPriorityEvalTime))host,err:=g.selectHost(priorityList)trace.Step("Prioritizing done")returnScheduleResult{SuggestedHost:host,EvaluatedNodes:len(filteredNodes)+len(filteredNodesStatuses),FeasibleNodes:len(filteredNodes),},err}
func(g*genericScheduler)podPassesFiltersOnNode(ctxcontext.Context,prof*profile.Profile,state*framework.CycleState,pod*v1.Pod,info*schedulernodeinfo.NodeInfo,)(bool,*framework.Status,error){varstatus*framework.StatuspodsAdded:=false// We run filters twice in some cases. If the node has greater or equal priority
// nominated pods, we run them when those pods are added to PreFilter state and nodeInfo.
// If all filters succeed in this pass, we run them again when these
// nominated pods are not added. This second pass is necessary because some
// filters such as inter-pod affinity may not pass without the nominated pods.
// If there are no nominated pods for the node or if the first run of the
// filters fail, we don't run the second pass.
// We consider only equal or higher priority pods in the first pass, because
// those are the current "pod" must yield to them and not take a space opened
// for running them. It is ok if the current "pod" take resources freed for
// lower priority pods.
// Requiring that the new pod is schedulable in both circumstances ensures that
// we are making a conservative decision: filters like resources and inter-pod
// anti-affinity are more likely to fail when the nominated pods are treated
// as running, while filters like pod affinity are more likely to fail when
// the nominated pods are treated as not running. We can't just assume the
// nominated pods are running because they are not running right now and in fact,
// they may end up getting scheduled to a different node.
fori:=0;i<2;i++{stateToUse:=statenodeInfoToUse:=infoifi==0{varerrerrorpodsAdded,stateToUse,nodeInfoToUse,err=g.addNominatedPods(ctx,prof,pod,state,info)iferr!=nil{returnfalse,nil,err}}elseif!podsAdded||!status.IsSuccess(){break}statusMap:=prof.RunFilterPlugins(ctx,stateToUse,pod,nodeInfoToUse)status=statusMap.Merge()if!status.IsSuccess()&&!status.IsUnschedulable(){returnfalse,status,status.AsError()}}returnstatus.IsSuccess(),status,nil}
func(g*genericScheduler)addNominatedPods(ctxcontext.Context,prof*profile.Profile,pod*v1.Pod,state*framework.CycleState,nodeInfo*schedulernodeinfo.NodeInfo)(bool,*framework.CycleState,*schedulernodeinfo.NodeInfo,error){ifg.schedulingQueue==nil||nodeInfo==nil||nodeInfo.Node()==nil{// This may happen only in tests.
returnfalse,state,nodeInfo,nil}nominatedPods:=g.schedulingQueue.NominatedPodsForNode(nodeInfo.Node().Name)iflen(nominatedPods)==0{returnfalse,state,nodeInfo,nil}nodeInfoOut:=nodeInfo.Clone()stateOut:=state.Clone()podsAdded:=falsefor_,p:=rangenominatedPods{ifpodutil.GetPodPriority(p)>=podutil.GetPodPriority(pod)&&p.UID!=pod.UID{nodeInfoOut.AddPod(p)status:=prof.RunPreFilterExtensionAddPod(ctx,stateOut,pod,p,nodeInfoOut)if!status.IsSuccess(){returnfalse,state,nodeInfo,status.AsError()}podsAdded=true}}returnpodsAdded,stateOut,nodeInfoOut,nil}
func(g*genericScheduler)prioritizeNodes(ctxcontext.Context,prof*profile.Profile,state*framework.CycleState,pod*v1.Pod,nodes[]*v1.Node,)(framework.NodeScoreList,error){// If no priority configs are provided, then all nodes will have a score of one.
// This is required to generate the priority list in the required format
iflen(g.extenders)==0&&!prof.HasScorePlugins(){result:=make(framework.NodeScoreList,0,len(nodes))fori:=rangenodes{result=append(result,framework.NodeScore{Name:nodes[i].Name,Score:1,})}returnresult,nil}// Run the Score plugins.
scoresMap,scoreStatus:=prof.RunScorePlugins(ctx,state,pod,nodes)if!scoreStatus.IsSuccess(){returnframework.NodeScoreList{},scoreStatus.AsError()}// Summarize all scores.
result:=make(framework.NodeScoreList,0,len(nodes))fori:=rangenodes{result=append(result,framework.NodeScore{Name:nodes[i].Name,Score:0})forj:=rangescoresMap{result[i].Score+=scoresMap[j][i].Score}}iflen(g.extenders)!=0&&nodes!=nil{varmusync.Mutexvarwgsync.WaitGroupcombinedScores:=make(map[string]int64,len(nodes))fori:=rangeg.extenders{if!g.extenders[i].IsInterested(pod){continue}wg.Add(1)gofunc(extIndexint){metrics.SchedulerGoroutines.WithLabelValues("prioritizing_extender").Inc()deferfunc(){metrics.SchedulerGoroutines.WithLabelValues("prioritizing_extender").Dec()wg.Done()}()prioritizedList,weight,err:=g.extenders[extIndex].Prioritize(pod,nodes)iferr!=nil{// Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
return}mu.Lock()fori:=range*prioritizedList{host,score:=(*prioritizedList)[i].Host,(*prioritizedList)[i].Scoreifklog.V(10){klog.Infof("%v -> %v: %v, Score: (%d)",util.GetPodFullName(pod),host,g.extenders[extIndex].Name(),score)}combinedScores[host]+=score*weight}mu.Unlock()}(i)}// wait for all go routines to finish
wg.Wait()fori:=rangeresult{// MaxExtenderPriority may diverge from the max priority used in the scheduler and defined by MaxNodeScore,
// therefore we need to scale the score returned by extenders to the score range used by the scheduler.
result[i].Score+=combinedScores[result[i].Name]*(framework.MaxNodeScore/extenderv1.MaxExtenderPriority)}}ifklog.V(10){fori:=rangeresult{klog.Infof("Host %s => Score %d",result[i].Name,result[i].Score)}}returnresult,nil}
iflen(g.extenders)!=0&&nodes!=nil{varmusync.Mutexvarwgsync.WaitGroupcombinedScores:=make(map[string]int64,len(nodes))fori:=rangeg.extenders{if!g.extenders[i].IsInterested(pod){continue}wg.Add(1)gofunc(extIndexint){metrics.SchedulerGoroutines.WithLabelValues("prioritizing_extender").Inc()deferfunc(){metrics.SchedulerGoroutines.WithLabelValues("prioritizing_extender").Dec()wg.Done()}()prioritizedList,weight,err:=g.extenders[extIndex].Prioritize(pod,nodes)iferr!=nil{// Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
return}mu.Lock()fori:=range*prioritizedList{host,score:=(*prioritizedList)[i].Host,(*prioritizedList)[i].Scoreifklog.V(10){klog.Infof("%v -> %v: %v, Score: (%d)",util.GetPodFullName(pod),host,g.extenders[extIndex].Name(),score)}combinedScores[host]+=score*weight}mu.Unlock()}(i)}// wait for all go routines to finish
wg.Wait()fori:=rangeresult{// MaxExtenderPriority may diverge from the max priority used in the scheduler and defined by MaxNodeScore,
// therefore we need to scale the score returned by extenders to the score range used by the scheduler.
result[i].Score+=combinedScores[result[i].Name]*(framework.MaxNodeScore/extenderv1.MaxExtenderPriority)}}
func(g*genericScheduler)selectHost(nodeScoreListframework.NodeScoreList)(string,error){iflen(nodeScoreList)==0{return"",fmt.Errorf("empty priorityList")}maxScore:=nodeScoreList[0].Scoreselected:=nodeScoreList[0].NamecntOfMaxScore:=1for_,ns:=rangenodeScoreList[1:]{ifns.Score>maxScore{maxScore=ns.Scoreselected=ns.NamecntOfMaxScore=1}elseifns.Score==maxScore{cntOfMaxScore++ifrand.Intn(cntOfMaxScore)==0{// Replace the candidate with probability of 1/cntOfMaxScore
selected=ns.Name}}}returnselected,nil}