为了对接公司内部的4层负载均衡,我们决定自己写一套节点挂载控制器,然后先分析了现有的endpointController相关实现。具体如下。
New
所有的controller都是在NewControllerInitializers
中初始化的,这里就是startEndpointController
,
1
|
controllers["endpoint"] = startEndpointController
|
再进入startEndpointController
分析,
1
2
3
4
5
6
7
8
9
10
|
func startEndpointController(ctx ControllerContext) (http.Handler, bool, error) {
go endpointcontroller.NewEndpointController(
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Core().V1().Services(),
ctx.InformerFactory.Core().V1().Endpoints(),
ctx.ClientBuilder.ClientOrDie("endpoint-controller"),
ctx.ComponentConfig.EndpointController.EndpointUpdatesBatchPeriod.Duration,
).Run(int(ctx.ComponentConfig.EndpointController.ConcurrentEndpointSyncs), ctx.Stop)
return nil, true, nil
}
|
这里会监听三种资源Pod,Service,Endpoint,在NewEndpointController
中实现了事件处理函数的初始化,
先看Service资源的事件处理方式,
1
2
3
4
5
6
7
8
9
|
serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: e.onServiceUpdate,
UpdateFunc: func(old, cur interface{}) {
e.onServiceUpdate(cur)
},
DeleteFunc: e.onServiceDelete,
})
e.serviceLister = serviceInformer.Lister()
e.servicesSynced = serviceInformer.Informer().HasSynce
|
先看onServiceUpdate
的处理逻辑,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
func (e *EndpointController) onServiceUpdate(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
return
}
_ = e.serviceSelectorCache.Update(key, obj.(*v1.Service).Spec.Selector)
e.queue.Add(key)
}
// onServiceDelete removes the Service Selector from the cache and queues the Service for processing.
func (e *EndpointController) onServiceDelete(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
return
}
e.serviceSelectorCache.Delete(key)
e.queue.Add(key)
}
|
这里会把service的selector缓存起来,然后再把service对应的key进行入队列。这里更新的cache会在pod事件里面用到。在onServiceDelete
事件处理中会删除对应的label缓存,并将service入队列。
再看pod事件处理,
1
2
3
4
5
6
7
8
9
10
11
12
13
|
// When a pod is added, figure out what services it will be a member of and
// enqueue them. obj must have *v1.Pod type.
func (e *EndpointController) addPod(obj interface{}) {
pod := obj.(*v1.Pod)
services, err := e.serviceSelectorCache.GetPodServiceMemberships(e.serviceLister, pod)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", pod.Namespace, pod.Name, err))
return
}
for key := range services {
e.queue.AddAfter(key, e.endpointUpdatesBatchPeriod)
}
}
|
这里会做两件事情,在UpdatePod
中也是类似的操作。
- 从cache中查找匹配pod的label的service;
- 把符合要求的service入队列。
Run
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer e.queue.ShutDown()
klog.Infof("Starting endpoint controller")
defer klog.Infof("Shutting down endpoint controller")
if !cache.WaitForNamedCacheSync("endpoint", stopCh, e.podsSynced, e.servicesSynced, e.endpointsSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(e.worker, e.workerLoopPeriod, stopCh)
}
go func() {
defer utilruntime.HandleCrash()
e.checkLeftoverEndpoints()
}()
<-stopCh
}
|
在这里看e.worker函数,e.checkLeftoverEndpoints
函数是处理孤儿endpoint,对应的service删除了,但是ep还存在。针对用来做高可用的ep会通过control-plane.alpha.kubernetes.io/leader
的label进行过滤。
然后看主逻辑syncService
,这个函数稍微有点长。
这个函数主要逻辑有:
- 先从缓存里Get Service,如果不存在了,则删除对应的endpoint;
- 根据service的label把所有相关的pod都找到
e.podLister.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelectorPreValidated())
- 查看service是否定义了
TolerateUnreadyEndpointsAnnotation
,这个将决定是否要把notReady
的pod也加入到endpoint;
- 把pod的address转换成endpointAddress;
- 创建endpointSubset;
- 获取当前service对应的endpoint,如果存在则把新生成的和当前的进行比较看是否有变化
apiequality.Semantic.DeepEqual(currentEndpoints.Subsets, subsets)&&apiequality.Semantic.DeepEqual(currentEndpoints.Labels, service.Labels)
,如果没有变化则直接返回。然后再根据是否需要创建新的endpoint来进行新建或者更新操作。
1
2
3
4
5
6
7
|
if createEndpoints {
// No previous endpoints, create them
_, err = e.client.CoreV1().Endpoints(service.Namespace).Create(context.TODO(), newEndpoints, metav1.CreateOptions{})
} else {
// Pre-existing
_, err = e.client.CoreV1().Endpoints(service.Namespace).Update(context.TODO(), newEndpoints, metav1.UpdateOptions{})
}
|
其中在创建subset的步骤中会根据pod是否ready来决定是否要将pod加入到ready队列中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
if tolerateUnreadyEndpoints || podutil.IsPodReady(pod) {
subsets = append(subsets, v1.EndpointSubset{
Addresses: []v1.EndpointAddress{epa},
Ports: ports,
})
readyEps++
} else if shouldPodBeInEndpoints(pod) {
klog.V(5).Infof("Pod is out of service: %s/%s", pod.Namespace, pod.Name)
subsets = append(subsets, v1.EndpointSubset{
NotReadyAddresses: []v1.EndpointAddress{epa},
Ports: ports,
})
notReadyEps++
}
|
这里的IsPodReady
函数会获取pod的Ready类型的Condition,如果Ready类型的Condition为false
则会判断整个pod为notReady
状态,那么将不会把该pod添加到endpoint。关于pod什么时候会是Ready状态,会在pod的状态管理部分处理,主要是根据containerReady和Readiness Gate(这个是在1.11加入的新功能,1.14GA)两部分来决定,本文不会深入分析这个部分,在后续的状态管理的文章将进一步分析这个问题。这里的Readiness Gate功能也将为我们所用,主要是为了让用户自行决定一个pod是否Ready,能否正常提供服务。
到这里endpointController这个控制器都分析完了。如果有想了解细节的还需要各位看官自行阅读源码了。