前面分析完调度和抢占过程,接下来将分析后面的工作。在完成前面两个步骤之后,将会进行assumePod,reserve,permit,bind等操作。
首先检查pod绑定的volume是否完成绑定,

1
2
3
4
5
6
7
allBound, err := sched.VolumeBinder.AssumePodVolumes(assumedPod, scheduleResult.SuggestedHost)
	if err != nil {
		sched.recordSchedulingFailure(prof, assumedPodInfo, err, SchedulerError,
			fmt.Sprintf("AssumePodVolumes failed: %v", err))
		metrics.PodScheduleErrors.Inc()
		return
	}

在这里会先检查所有的pvc是否都已经完成了binding及provision操作,然后通过UpdateBinding操作完成最后的更新。
然后进行RunReservePluginsassume操作,在assume中,会先将assumepod.Spec.NodeName字段更新为当前Node更新SchedulerCache.AssumePod部分。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
func (cache *schedulerCache) AssumePod(pod *v1.Pod) error {
	key, err := schedulernodeinfo.GetPodKey(pod)
	if err != nil {
		return err
	}

	cache.mu.Lock()
	defer cache.mu.Unlock()
	if _, ok := cache.podStates[key]; ok {
		return fmt.Errorf("pod %v is in the cache, so can't be assumed", key)
	}

	cache.addPod(pod)
	ps := &podState{
		pod: pod,
	}
	cache.podStates[key] = ps
	cache.assumedPods[key] = true
	return nil
}

然后删除SchedulingQueue中的assumePod。
执行Permit的plugin,在这里会有延迟绑定pod操作,coscheduling的相关实现就是在这个插件做文章。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (f *framework) RunPermitPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) {
	startTime := time.Now()
	defer func() {
		metrics.FrameworkExtensionPointDuration.WithLabelValues(permit, status.Code().String()).Observe(metrics.SinceInSeconds(startTime))
	}()
	pluginsWaitTime := make(map[string]time.Duration)
	statusCode := Success
	for _, pl := range f.permitPlugins {
		status, timeout := f.runPermitPlugin(ctx, pl, state, pod, nodeName)
		if !status.IsSuccess() {
			if status.IsUnschedulable() {
				msg := fmt.Sprintf("rejected pod %q by permit plugin %q: %v", pod.Name, pl.Name(), status.Message())
				klog.V(4).Infof(msg)
				return NewStatus(status.Code(), msg)
			}
			if status.Code() == Wait {
				// Not allowed to be greater than maxTimeout.
				if timeout > maxTimeout {
					timeout = maxTimeout
				}
				pluginsWaitTime[pl.Name()] = timeout
				statusCode = Wait
			} else {
				msg := fmt.Sprintf("error while running %q permit plugin for pod %q: %v", pl.Name(), pod.Name, status.Message())
				klog.Error(msg)
				return NewStatus(Error, msg)
			}
		}
	}
	if statusCode == Wait {
		waitingPod := newWaitingPod(pod, pluginsWaitTime)
		f.waitingPods.add(waitingPod)
		msg := fmt.Sprintf("one or more plugins asked to wait and no plugin rejected pod %q", pod.Name)
		klog.V(4).Infof(msg)
		return NewStatus(Wait, msg)
	}
	return nil
}

在这里会调用每个实现了PermitPlugin的插件,如果返回不是SuccessWait则表示异常。如果返回是Wait则会根据返回的时间去创建一个pluginsWaitTime,在这里记录每个插件需要等待的时间。然后将等待的pod添加到framework。

1
2
3
4
5
6
7
  if statusCode == Wait {
		waitingPod := newWaitingPod(pod, pluginsWaitTime)
		f.waitingPods.add(waitingPod)
		msg := fmt.Sprintf("one or more plugins asked to wait and no plugin rejected pod %q", pod.Name)
		klog.V(4).Infof(msg)
		return NewStatus(Wait, msg)
	}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
  func newWaitingPod(pod *v1.Pod, pluginsMaxWaitTime map[string]time.Duration) *waitingPod {
  	wp := &waitingPod{
  		pod: pod,
  		// Allow() and Reject() calls are non-blocking. This property is guaranteed
  		// by using non-blocking send to this channel. This channel has a buffer of size 1
  		// to ensure that non-blocking send will not be ignored - possible situation when
  		// receiving from this channel happens after non-blocking send.
  		s: make(chan *Status, 1),
  	}

  	wp.pendingPlugins = make(map[string]*time.Timer, len(pluginsMaxWaitTime))
  	// The time.AfterFunc calls wp.Reject which iterates through pendingPlugins map. Acquire the
  	// lock here so that time.AfterFunc can only execute after newWaitingPod finishes.
  	wp.mu.Lock()
  	defer wp.mu.Unlock()
    //根据每个plugin的超时时间创建对应的超时拒绝函数
  	for k, v := range pluginsMaxWaitTime {
  		plugin, waitTime := k, v
  		wp.pendingPlugins[plugin] = time.AfterFunc(waitTime, func() {
  			msg := fmt.Sprintf("rejected due to timeout after waiting %v at plugin %v",
  				waitTime, plugin)
  			wp.Reject(msg)
  		})
  	}

  	return wp
  }

最后会将waitingPod添加到framework的WaitingPod。如果前面的结果返回异常则会将assumePod从cache中删掉,并执行UnreservePlugins

Binding Cycle

这一部分的代码量比较少,直接启动一个协程去执行整个操作过程。这里会先执行一个阻塞函数prof.WaitOnPermit,在这里会一直等待waitingPod的返回状态。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (f *framework) WaitOnPermit(ctx context.Context, pod *v1.Pod) (status *Status) {
	waitingPod := f.waitingPods.get(pod.UID)
	if waitingPod == nil {
		return nil
	}
	defer f.waitingPods.remove(pod.UID)
	klog.V(4).Infof("pod %q waiting on permit", pod.Name)

	startTime := time.Now()
  //在这里阻塞,等待调度结果
	s := <-waitingPod.s
	metrics.PermitWaitDuration.WithLabelValues(s.Code().String()).Observe(metrics.SinceInSeconds(startTime))

	if !s.IsSuccess() {
		if s.IsUnschedulable() {
			msg := fmt.Sprintf("pod %q rejected while waiting on permit: %v", pod.Name, s.Message())
			klog.V(4).Infof(msg)
			return NewStatus(s.Code(), msg)
		}
		msg := fmt.Sprintf("error received while waiting on permit for pod %q: %v", pod.Name, s.Message())
		klog.Error(msg)
		return NewStatus(Error, msg)
	}
	return nil
}

在这里具体返回结果需要看插件中Permit操作怎么实现的,我们可以简单看一下coscheduling怎么实现的批调度。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// Permit is the functions invoked by the framework at "Permit" extension point.
func (cs *Coscheduling) Permit(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (*framework.Status, time.Duration) {
	pgInfo, _ := cs.getOrCreatePodGroupInfo(pod, time.Now())
	if len(pgInfo.key) == 0 {
		return framework.NewStatus(framework.Success, ""), 0
	}

	namespace := pod.Namespace
	podGroupName := pgInfo.name
	minAvailable := pgInfo.minAvailable
	// bound includes both assigned and assumed Pods.
	bound := cs.calculateBoundPods(podGroupName, namespace)
	// The bound is calculated from the snapshot. The current pod does not exist in the snapshot during this scheduling cycle.
	current := bound + 1

	if current < minAvailable {
		klog.V(3).Infof("The count of podGroup %v/%v/%v is not up to minAvailable(%d) in Permit: current(%d)",
			pod.Namespace, podGroupName, pod.Name, minAvailable, current)
		// TODO Change the timeout to a dynamic value depending on the size of the `PodGroup`
		return framework.NewStatus(framework.Wait, ""), time.Duration(cs.args.PermitWaitingTimeSeconds) * time.Second
	}

	klog.V(3).Infof("The count of PodGroup %v/%v/%v is up to minAvailable(%d) in Permit: current(%d)",
		pod.Namespace, podGroupName, pod.Name, minAvailable, current)
	cs.frameworkHandle.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) {
		if waitingPod.GetPod().Namespace == namespace && waitingPod.GetPod().Labels[PodGroupName] == podGroupName {
			klog.V(3).Infof("Permit allows the pod: %v/%v", podGroupName, waitingPod.GetPod().Name)
			waitingPod.Allow(cs.Name())
		}
	})

	return framework.NewStatus(framework.Success, ""), 0
}

coscheduling代码实现,通过calculateBoundPods计算每个groupName在整个集群的pod数量有多少,是否已经满足最小的需求,如果是则通过waitingPod.Allow来通知前面的channel完成调度,否则则会返回Wait状态码。
完成了Permit操作之后还需要检查是否所有的volume都已经完成绑定,如果没有还需要再进行绑定,这里会直接调用k8s的API进行相关的绑定操作。
接下来进入PreBindBind, PostBind阶段。