前面分析完调度和抢占过程,接下来将分析后面的工作。在完成前面两个步骤之后,将会进行assumePod
,reserve
,permit
,bind
等操作。
首先检查pod绑定的volume是否完成绑定,
1
2
3
4
5
6
7
|
allBound, err := sched.VolumeBinder.AssumePodVolumes(assumedPod, scheduleResult.SuggestedHost)
if err != nil {
sched.recordSchedulingFailure(prof, assumedPodInfo, err, SchedulerError,
fmt.Sprintf("AssumePodVolumes failed: %v", err))
metrics.PodScheduleErrors.Inc()
return
}
|
在这里会先检查所有的pvc是否都已经完成了binding及provision操作,然后通过UpdateBinding
操作完成最后的更新。
然后进行RunReservePlugins
和assume
操作,在assume中,会先将assumepod.Spec.NodeName
字段更新为当前Node更新SchedulerCache.AssumePod部分。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
func (cache *schedulerCache) AssumePod(pod *v1.Pod) error {
key, err := schedulernodeinfo.GetPodKey(pod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
if _, ok := cache.podStates[key]; ok {
return fmt.Errorf("pod %v is in the cache, so can't be assumed", key)
}
cache.addPod(pod)
ps := &podState{
pod: pod,
}
cache.podStates[key] = ps
cache.assumedPods[key] = true
return nil
}
|
然后删除SchedulingQueue
中的assumePod。
执行Permit
的plugin,在这里会有延迟绑定pod操作,coscheduling的相关实现就是在这个插件做文章。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
func (f *framework) RunPermitPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) {
startTime := time.Now()
defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(permit, status.Code().String()).Observe(metrics.SinceInSeconds(startTime))
}()
pluginsWaitTime := make(map[string]time.Duration)
statusCode := Success
for _, pl := range f.permitPlugins {
status, timeout := f.runPermitPlugin(ctx, pl, state, pod, nodeName)
if !status.IsSuccess() {
if status.IsUnschedulable() {
msg := fmt.Sprintf("rejected pod %q by permit plugin %q: %v", pod.Name, pl.Name(), status.Message())
klog.V(4).Infof(msg)
return NewStatus(status.Code(), msg)
}
if status.Code() == Wait {
// Not allowed to be greater than maxTimeout.
if timeout > maxTimeout {
timeout = maxTimeout
}
pluginsWaitTime[pl.Name()] = timeout
statusCode = Wait
} else {
msg := fmt.Sprintf("error while running %q permit plugin for pod %q: %v", pl.Name(), pod.Name, status.Message())
klog.Error(msg)
return NewStatus(Error, msg)
}
}
}
if statusCode == Wait {
waitingPod := newWaitingPod(pod, pluginsWaitTime)
f.waitingPods.add(waitingPod)
msg := fmt.Sprintf("one or more plugins asked to wait and no plugin rejected pod %q", pod.Name)
klog.V(4).Infof(msg)
return NewStatus(Wait, msg)
}
return nil
}
|
在这里会调用每个实现了PermitPlugin
的插件,如果返回不是Success
或Wait
则表示异常。如果返回是Wait
则会根据返回的时间去创建一个pluginsWaitTime
,在这里记录每个插件需要等待的时间。然后将等待的pod添加到framework。
1
2
3
4
5
6
7
|
if statusCode == Wait {
waitingPod := newWaitingPod(pod, pluginsWaitTime)
f.waitingPods.add(waitingPod)
msg := fmt.Sprintf("one or more plugins asked to wait and no plugin rejected pod %q", pod.Name)
klog.V(4).Infof(msg)
return NewStatus(Wait, msg)
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
func newWaitingPod(pod *v1.Pod, pluginsMaxWaitTime map[string]time.Duration) *waitingPod {
wp := &waitingPod{
pod: pod,
// Allow() and Reject() calls are non-blocking. This property is guaranteed
// by using non-blocking send to this channel. This channel has a buffer of size 1
// to ensure that non-blocking send will not be ignored - possible situation when
// receiving from this channel happens after non-blocking send.
s: make(chan *Status, 1),
}
wp.pendingPlugins = make(map[string]*time.Timer, len(pluginsMaxWaitTime))
// The time.AfterFunc calls wp.Reject which iterates through pendingPlugins map. Acquire the
// lock here so that time.AfterFunc can only execute after newWaitingPod finishes.
wp.mu.Lock()
defer wp.mu.Unlock()
//根据每个plugin的超时时间创建对应的超时拒绝函数
for k, v := range pluginsMaxWaitTime {
plugin, waitTime := k, v
wp.pendingPlugins[plugin] = time.AfterFunc(waitTime, func() {
msg := fmt.Sprintf("rejected due to timeout after waiting %v at plugin %v",
waitTime, plugin)
wp.Reject(msg)
})
}
return wp
}
|
最后会将waitingPod添加到framework的WaitingPod。如果前面的结果返回异常则会将assumePod从cache中删掉,并执行UnreservePlugins
。
Binding Cycle
这一部分的代码量比较少,直接启动一个协程去执行整个操作过程。这里会先执行一个阻塞函数prof.WaitOnPermit
,在这里会一直等待waitingPod的返回状态。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
func (f *framework) WaitOnPermit(ctx context.Context, pod *v1.Pod) (status *Status) {
waitingPod := f.waitingPods.get(pod.UID)
if waitingPod == nil {
return nil
}
defer f.waitingPods.remove(pod.UID)
klog.V(4).Infof("pod %q waiting on permit", pod.Name)
startTime := time.Now()
//在这里阻塞,等待调度结果
s := <-waitingPod.s
metrics.PermitWaitDuration.WithLabelValues(s.Code().String()).Observe(metrics.SinceInSeconds(startTime))
if !s.IsSuccess() {
if s.IsUnschedulable() {
msg := fmt.Sprintf("pod %q rejected while waiting on permit: %v", pod.Name, s.Message())
klog.V(4).Infof(msg)
return NewStatus(s.Code(), msg)
}
msg := fmt.Sprintf("error received while waiting on permit for pod %q: %v", pod.Name, s.Message())
klog.Error(msg)
return NewStatus(Error, msg)
}
return nil
}
|
在这里具体返回结果需要看插件中Permit
操作怎么实现的,我们可以简单看一下coscheduling
怎么实现的批调度。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
// Permit is the functions invoked by the framework at "Permit" extension point.
func (cs *Coscheduling) Permit(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (*framework.Status, time.Duration) {
pgInfo, _ := cs.getOrCreatePodGroupInfo(pod, time.Now())
if len(pgInfo.key) == 0 {
return framework.NewStatus(framework.Success, ""), 0
}
namespace := pod.Namespace
podGroupName := pgInfo.name
minAvailable := pgInfo.minAvailable
// bound includes both assigned and assumed Pods.
bound := cs.calculateBoundPods(podGroupName, namespace)
// The bound is calculated from the snapshot. The current pod does not exist in the snapshot during this scheduling cycle.
current := bound + 1
if current < minAvailable {
klog.V(3).Infof("The count of podGroup %v/%v/%v is not up to minAvailable(%d) in Permit: current(%d)",
pod.Namespace, podGroupName, pod.Name, minAvailable, current)
// TODO Change the timeout to a dynamic value depending on the size of the `PodGroup`
return framework.NewStatus(framework.Wait, ""), time.Duration(cs.args.PermitWaitingTimeSeconds) * time.Second
}
klog.V(3).Infof("The count of PodGroup %v/%v/%v is up to minAvailable(%d) in Permit: current(%d)",
pod.Namespace, podGroupName, pod.Name, minAvailable, current)
cs.frameworkHandle.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) {
if waitingPod.GetPod().Namespace == namespace && waitingPod.GetPod().Labels[PodGroupName] == podGroupName {
klog.V(3).Infof("Permit allows the pod: %v/%v", podGroupName, waitingPod.GetPod().Name)
waitingPod.Allow(cs.Name())
}
})
return framework.NewStatus(framework.Success, ""), 0
}
|
在coscheduling
代码实现,通过calculateBoundPods
计算每个groupName在整个集群的pod数量有多少,是否已经满足最小的需求,如果是则通过waitingPod.Allow
来通知前面的channel
完成调度,否则则会返回Wait
状态码。
完成了Permit操作之后还需要检查是否所有的volume都已经完成绑定,如果没有还需要再进行绑定,这里会直接调用k8s的API进行相关的绑定操作。
接下来进入PreBind
,Bind
, PostBind
阶段。