上篇介绍了调度的PreFilter,Filter,PreScore,Score部分,本文将介绍Preempt部分。

在调度过程中没有合适的节点能够调度pod则会发生抢占,

1
2
3
4
5
node, victims, nominatedPodsToClear, err := sched.Algorithm.Preempt(ctx, prof, state, preemptor, scheduleErr)
if err != nil {
  klog.Errorf("Error preempting victims to make room for %v/%v: %v", preemptor.Namespace, preemptor.Name, err)
  return "", err
}

在抢占阶段还将会用调度阶段的节点快照信息,

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
func (g *genericScheduler) Preempt(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
	// Scheduler may return various types of errors. Consider preemption only if
	// the error is of type FitError.
	fitError, ok := scheduleErr.(*FitError)
	if !ok || fitError == nil {
		return nil, nil, nil, nil
	}
	if !podEligibleToPreemptOthers(pod, g.nodeInfoSnapshot.NodeInfos(), g.enableNonPreempting) {
		klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)
		return nil, nil, nil, nil
	}
	allNodes, err := g.nodeInfoSnapshot.NodeInfos().List()
	if err != nil {
		return nil, nil, nil, err
	}
	if len(allNodes) == 0 {
		return nil, nil, nil, ErrNoNodesAvailable
	}
	potentialNodes := nodesWherePreemptionMightHelp(allNodes, fitError)
	if len(potentialNodes) == 0 {
		klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name)
		// In this case, we should clean-up any existing nominated node name of the pod.
		return nil, nil, []*v1.Pod{pod}, nil
	}
  //找到所有pdb配置
	var pdbs []*policy.PodDisruptionBudget
	if g.pdbLister != nil {
		pdbs, err = g.pdbLister.List(labels.Everything())
		if err != nil {
			return nil, nil, nil, err
		}
	}
	nodeToVictims, err := g.selectNodesForPreemption(ctx, prof, state, pod, potentialNodes, pdbs)
	if err != nil {
		return nil, nil, nil, err
	}

	// We will only check nodeToVictims with extenders that support preemption.
	// Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated
	// node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles.
	nodeToVictims, err = g.processPreemptionWithExtenders(pod, nodeToVictims)
	if err != nil {
		return nil, nil, nil, err
	}

	candidateNode := pickOneNodeForPreemption(nodeToVictims)
	if candidateNode == nil {
		return nil, nil, nil, nil
	}

	// Lower priority pods nominated to run on this node, may no longer fit on
	// this node. So, we should remove their nomination. Removing their
	// nomination updates these pods and moves them to the active queue. It
	// lets scheduler find another place for them.
	nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode.Name)
	return candidateNode, nodeToVictims[candidateNode].Pods, nominatedPods, nil
}

在抢占节点会先检查pod是否能进行抢占。其中如果pod.Status.NominatedNodeName不为空,并且提名节点上有正在删除的pod则不会发生抢占,当然还有一些其他条件来判断是否会发生抢占,

1
2
3
4
if enableNonPreempting && pod.Spec.PreemptionPolicy != nil && *pod.Spec.PreemptionPolicy == v1.PreemptNever {
		klog.V(5).Infof("Pod %v/%v is not eligible for preemption because it has a preemptionPolicy of %v", pod.Namespace, pod.Name, v1.PreemptNever)
		return false
	}

接着会根据调度阶段的返回结果进行初步筛选能进行抢占的Node节点。nodesWherePreemptionMightHelp将会根据调度返回的结果进行过滤节点,如果过滤的节点码不是UnschedulableAndUnresolvable则都有机会进行抢占。 然后会进行抢占调度的节点选择,在selectNodesForPreemption会并行启动多个协程进行节点筛选,

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (g *genericScheduler) selectNodesForPreemption(
	ctx context.Context,
	prof *profile.Profile,
	state *framework.CycleState,
	pod *v1.Pod,
	potentialNodes []*schedulernodeinfo.NodeInfo,
	pdbs []*policy.PodDisruptionBudget,
) (map[*v1.Node]*extenderv1.Victims, error) {
	nodeToVictims := map[*v1.Node]*extenderv1.Victims{}
	var resultLock sync.Mutex

	checkNode := func(i int) {
		nodeInfoCopy := potentialNodes[i].Clone()
		stateCopy := state.Clone()
		pods, numPDBViolations, fits := g.selectVictimsOnNode(ctx, prof, stateCopy, pod, nodeInfoCopy, pdbs)
		if fits {
			resultLock.Lock()
			victims := extenderv1.Victims{
				Pods:             pods,
				NumPDBViolations: int64(numPDBViolations),
			}
			nodeToVictims[potentialNodes[i].Node()] = &victims
			resultLock.Unlock()
		}
	}
	workqueue.ParallelizeUntil(context.TODO(), 16, len(potentialNodes), checkNode)
	return nodeToVictims, nil
}

在上面的函数中可以看到最终的筛选是在匿名函数checkNodeselectVictimsOnNode完成。接下来详细分析这个函数,在这个函数中会先定义两个匿名函数removePodaddPod,抢占可以分为以下几个步骤,

  1. 找到节点上优先级比抢占的pod低的pod,并执行removePod,在这里会执行各个plugin定义的RemovePod操作,默认实现这个操作的插件有interpodaffinity, podtopologyspread, serviceaffinity;
  2. 通过podPassesFiltersOnNode检查pod是否符合在node节点上运行,这个就是调度策略的那个函数,同样会执行两边,检查nominated是否满足,再检查已经调度上的是否满足;
  3. 按照优先级排序需要删除的pod,排序算法为先看pod的优先级,然后看pod的启动时间,启动越早优先级越高;
  4. 通过filterPodsWithPDBViolation计算需要删除的pod是否满足pdb的要求,并最终将结果返回在两个pod slice分别是violatingPodsnonViolatingPods
  5. 根据上面选出来的violatingPodsnonViolatingPods,通过匿名函数reprievePod尽可能多的将pod重新调度到这个节点上;
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
reprievePod := func(p *v1.Pod) (bool, error) {
  //先将删除的pod添加回来
		if err := addPod(p); err != nil {
			return false, err
		}
    //判断添加完之后是否还符合抢占pod的调度
		fits, _, _ := g.podPassesFiltersOnNode(ctx, prof, state, pod, nodeInfo)
		if !fits {
      //不符合就再删除pod
			if err := removePod(p); err != nil {
				return false, err
			}
      //并将这个需要删除的pod添加到最终的需要删除的slice
			victims = append(victims, p)
			klog.V(5).Infof("Pod %v/%v is a potential preemption victim on node %v.", p.Namespace, p.Name, nodeInfo.Node().Name)
		}
		return fits, nil
	}

上面分析完内置的抢占逻辑之后将会运行用户通过extender方式扩展的抢占逻辑,这里会用上面过滤出来的结果进行再次过滤,

1
2
3
4
nodeToVictims, err = g.processPreemptionWithExtenders(pod, nodeToVictims)
	if err != nil {
		return nil, nil, nil, err
	}

最后在上面的结果中选择一个node节点,选择的原则是,

  1. 节点需要驱逐的违反PDB的pod最少的节点;
  2. 节点需要驱逐pod的最高优先级最小;
  3. 如果上面还存在多个node,则分别将这些node上pod的优先级相加,取和最小的节点;
  4. 如果上面还存在多个节点,则会选择需要驱逐的pod数量最小的节点;
  5. 终极办法,选择最高优先级中pod创建时间最近的节点。

最后通过getLowerPriorityNominatedPods将抢占pod上低优先级的nominated的pod选出来,

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func (g *genericScheduler) getLowerPriorityNominatedPods(pod *v1.Pod, nodeName string) []*v1.Pod {
	pods := g.schedulingQueue.NominatedPodsForNode(nodeName)

	if len(pods) == 0 {
		return nil
	}

	var lowerPriorityPods []*v1.Pod
	podPriority := podutil.GetPodPriority(pod)
	for _, p := range pods {
		if podutil.GetPodPriority(p) < podPriority {
			lowerPriorityPods = append(lowerPriorityPods, p)
		}
	}
	return lowerPriorityPods
}

到这里抢占的主要逻辑基本讲完了,后面将分析选出节点之后的操作。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
if node != nil {
  nodeName = node.Name
  // Update the scheduling queue with the nominated pod information. Without
  // this, there would be a race condition between the next scheduling cycle
  // and the time the scheduler receives a Pod Update for the nominated pod.
  //先更新nodeInfo中node对应的抢占pod信息,将将要抢占的pod写入到缓存中,
  sched.SchedulingQueue.UpdateNominatedPodForNode(preemptor, nodeName)

  // Make a call to update nominated node name of the pod on the API server.
  //更新pod对应的status,将nodename添加到pod.Status.NominatedNodeName
  err = sched.podPreemptor.setNominatedNodeName(preemptor, nodeName)
  if err != nil {
    klog.Errorf("Error in preemption process. Cannot set 'NominatedPod' on pod %v/%v: %v", preemptor.Namespace, preemptor.Name, err)
    sched.SchedulingQueue.DeleteNominatedPodIfExists(preemptor)
    return "", err
  }

  for _, victim := range victims {
    //调用k8s的API接口删除需要删除的pod,
    if err := sched.podPreemptor.deletePod(victim); err != nil {
      klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err)
      return "", err
    }
    // If the victim is a WaitingPod, send a reject message to the PermitPlugin
    //如果pod是正在等待的pod,则会直接停止等待,直接返回Unschedulable状态
    if waitingPod := prof.GetWaitingPod(victim.UID); waitingPod != nil {
      waitingPod.Reject("preempted")
    }
    prof.Recorder.Eventf(victim, preemptor, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", preemptor.Namespace, preemptor.Name, nodeName)

  }
  metrics.PreemptionVictims.Observe(float64(len(victims)))
}

最后清理需要清理的nominatedPod,通过k8s client修改pod.Status.NominatedNodeName为空

1
2
3
4
5
6
7
for _, p := range nominatedPodsToClear {
  rErr := sched.podPreemptor.removeNominatedNodeName(p)
  if rErr != nil {
    klog.Errorf("Cannot remove 'NominatedPod' field of pod: %v", rErr)
    // We do not return as this error is not critical.
  }
}

到这里抢占逻辑全部分析完了,最终也将整个逻辑大致画了一个图,可以参考一下。

到这里抢占逻辑也分析完了,后面将分析framework架构中的其他部分。