为了对接公司内部的4层负载均衡,我们决定自己写一套节点挂载控制器,然后先分析了现有的endpointController相关实现。具体如下。

New

所有的controller都是在NewControllerInitializers中初始化的,这里就是startEndpointController,

1
controllers["endpoint"] = startEndpointController

再进入startEndpointController分析,

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func startEndpointController(ctx ControllerContext) (http.Handler, bool, error) {
	go endpointcontroller.NewEndpointController(
		ctx.InformerFactory.Core().V1().Pods(),
		ctx.InformerFactory.Core().V1().Services(),
		ctx.InformerFactory.Core().V1().Endpoints(),
		ctx.ClientBuilder.ClientOrDie("endpoint-controller"),
		ctx.ComponentConfig.EndpointController.EndpointUpdatesBatchPeriod.Duration,
	).Run(int(ctx.ComponentConfig.EndpointController.ConcurrentEndpointSyncs), ctx.Stop)
	return nil, true, nil
}

这里会监听三种资源Pod,Service,Endpoint,在NewEndpointController中实现了事件处理函数的初始化, 先看Service资源的事件处理方式,

1
2
3
4
5
6
7
8
9
serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
	AddFunc: e.onServiceUpdate,
	UpdateFunc: func(old, cur interface{}) {
		e.onServiceUpdate(cur)
	},
	DeleteFunc: e.onServiceDelete,
})
e.serviceLister = serviceInformer.Lister()
e.servicesSynced = serviceInformer.Informer().HasSynce

先看onServiceUpdate的处理逻辑,

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (e *EndpointController) onServiceUpdate(obj interface{}) {
	key, err := controller.KeyFunc(obj)
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
		return
	}

	_ = e.serviceSelectorCache.Update(key, obj.(*v1.Service).Spec.Selector)
	e.queue.Add(key)
}

// onServiceDelete removes the Service Selector from the cache and queues the Service for processing.
func (e *EndpointController) onServiceDelete(obj interface{}) {
	key, err := controller.KeyFunc(obj)
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
		return
	}

	e.serviceSelectorCache.Delete(key)
	e.queue.Add(key)
}

这里会把service的selector缓存起来,然后再把service对应的key进行入队列。这里更新的cache会在pod事件里面用到。在onServiceDelete事件处理中会删除对应的label缓存,并将service入队列。 再看pod事件处理,

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// When a pod is added, figure out what services it will be a member of and
// enqueue them. obj must have *v1.Pod type.
func (e *EndpointController) addPod(obj interface{}) {
	pod := obj.(*v1.Pod)
	services, err := e.serviceSelectorCache.GetPodServiceMemberships(e.serviceLister, pod)
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", pod.Namespace, pod.Name, err))
		return
	}
	for key := range services {
		e.queue.AddAfter(key, e.endpointUpdatesBatchPeriod)
	}
}

这里会做两件事情,在UpdatePod中也是类似的操作。

  1. 从cache中查找匹配pod的label的service;
  2. 把符合要求的service入队列。

Run

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	defer e.queue.ShutDown()

	klog.Infof("Starting endpoint controller")
	defer klog.Infof("Shutting down endpoint controller")

	if !cache.WaitForNamedCacheSync("endpoint", stopCh, e.podsSynced, e.servicesSynced, e.endpointsSynced) {
		return
	}

	for i := 0; i < workers; i++ {
		go wait.Until(e.worker, e.workerLoopPeriod, stopCh)
	}

	go func() {
		defer utilruntime.HandleCrash()
		e.checkLeftoverEndpoints()
	}()

	<-stopCh
}

在这里看e.worker函数,e.checkLeftoverEndpoints函数是处理孤儿endpoint,对应的service删除了,但是ep还存在。针对用来做高可用的ep会通过control-plane.alpha.kubernetes.io/leader的label进行过滤。 然后看主逻辑syncService,这个函数稍微有点长。 这个函数主要逻辑有:

  1. 先从缓存里Get Service,如果不存在了,则删除对应的endpoint;
  2. 根据service的label把所有相关的pod都找到e.podLister.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelectorPreValidated())
  3. 查看service是否定义了TolerateUnreadyEndpointsAnnotation,这个将决定是否要把notReady的pod也加入到endpoint;
  4. 把pod的address转换成endpointAddress;
  5. 创建endpointSubset;
  6. 获取当前service对应的endpoint,如果存在则把新生成的和当前的进行比较看是否有变化apiequality.Semantic.DeepEqual(currentEndpoints.Subsets, subsets)&&apiequality.Semantic.DeepEqual(currentEndpoints.Labels, service.Labels),如果没有变化则直接返回。然后再根据是否需要创建新的endpoint来进行新建或者更新操作。
1
2
3
4
5
6
7
if createEndpoints {
	// No previous endpoints, create them
	_, err = e.client.CoreV1().Endpoints(service.Namespace).Create(context.TODO(), newEndpoints, metav1.CreateOptions{})
} else {
	// Pre-existing
	_, err = e.client.CoreV1().Endpoints(service.Namespace).Update(context.TODO(), newEndpoints, metav1.UpdateOptions{})
}

其中在创建subset的步骤中会根据pod是否ready来决定是否要将pod加入到ready队列中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
if tolerateUnreadyEndpoints || podutil.IsPodReady(pod) {
		subsets = append(subsets, v1.EndpointSubset{
			Addresses: []v1.EndpointAddress{epa},
			Ports:     ports,
		})
		readyEps++
	} else if shouldPodBeInEndpoints(pod) {
		klog.V(5).Infof("Pod is out of service: %s/%s", pod.Namespace, pod.Name)
		subsets = append(subsets, v1.EndpointSubset{
			NotReadyAddresses: []v1.EndpointAddress{epa},
			Ports:             ports,
		})
		notReadyEps++
	}

这里的IsPodReady函数会获取pod的Ready类型的Condition,如果Ready类型的Condition为false则会判断整个pod为notReady状态,那么将不会把该pod添加到endpoint。关于pod什么时候会是Ready状态,会在pod的状态管理部分处理,主要是根据containerReady和Readiness Gate(这个是在1.11加入的新功能,1.14GA)两部分来决定,本文不会深入分析这个部分,在后续的状态管理的文章将进一步分析这个问题。这里的Readiness Gate功能也将为我们所用,主要是为了让用户自行决定一个pod是否Ready,能否正常提供服务。

到这里endpointController这个控制器都分析完了。如果有想了解细节的还需要各位看官自行阅读源码了。