Client-go客户端

主要介绍四种Client

RESTClient是最基础的Client,它将HTTP client封装成RESTful风格的API。

ClientSet、DynamicClient、DiscoveryClient都是基于RESTClient实现。 ClientSet在RESTClient的基础上封装了Resource和Version的部分。ClientSet只处理内部资源。

DynamicClient与ClientSet最大的不同之处是,ClientSet仅能访问Kubernetes自带的资源(即Client集合内的资源),不能直接访问CRD自定义资源。DynamicClient能够处理Kubernetes中的所有资源对象,包括Kubernetes内置资源与CRD自定义资源。

DiscoveryClient发现客户端,用于发现kube-apiserver所支持的资源组、资源版本、资源信息(即Group、Versions、Resources)。

Informer

这里先看一下Informer结构图。 Informer结构图总结,

Informer结构

这里以一个deployment的Informer为例进行展开讲述Informer具体怎么工作的。

1
2
3
4
5
6
7
8
9
Client, err := kubernetes.NewForConfig(cfg)
if err != nil {
    klog.Fatalf("Error building kubernetes clientset: %s", err.Error())
}

kubeInformerFactory := kubeinformers.NewSharedInformerFactory(Client, time.Second*30)

controller := NewController(Client, kubeInformerFactory.Apps().V1().Deployments())
kubeInformerFactory.Start(stopCh)

其中controller的实现为,

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: controller.handleObject,
        UpdateFunc: func(old, new interface{}) {
            newDepl := new.(*appsv1.Deployment)
            oldDepl := old.(*appsv1.Deployment)
            if newDepl.ResourceVersion == oldDepl.ResourceVersion {
                // Periodic resync will send update events for all known Deployments.
                // Two different versions of the same Deployment will always have different RVs.
                return
            }
            controller.handleObject(new)
        },
        DeleteFunc: controller.handleObject,
    })

controller := &Controller{
  clinetSet: Client,
  deploymentsLister: deploymentInformer.Lister(),
  deploymentsSynced: deploymentInformer.Informer().HasSynced,
  workQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "")
}

首先我们先看关于deployment的Informer,kubeInformerFactory.Apps().V1().Deployments(),在这个里面会实例化一个DeploymentInformer。

1
2
3
func (v *version) Deployments() DeploymentInformer {
	return &deploymentInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}

后面讲围绕上面生成的deploymentInformer实例进行相关的分析。首先会执行Informer()方法,在这个方法里面将会生成一个实现SharedIndexInformer的实例。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
	return cache.NewSharedIndexInformer(
		&cache.ListWatch{
			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.AppsV1().Deployments(namespace).List(options)
			},
			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.AppsV1().Deployments(namespace).Watch(options)
			},
		},
		&appsv1.Deployment{},
		resyncPeriod,
		indexers,
	)
}

func (f *deploymentInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
	return NewFilteredDeploymentInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}

func (f *deploymentInformer) Informer() cache.SharedIndexInformer {
	return f.factory.InformerFor(&appsv1.Deployment{}, f.defaultInformer)
}

在上面的InformerFor函数中先根据对应的资源类型生成对应的Informer,如果是已经存在的则不会再创建,以此来达到共享Informer的目的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
	f.lock.Lock()
	defer f.lock.Unlock()

	informerType := reflect.TypeOf(obj)
	informer, exists := f.informers[informerType]
	if exists {
		return informer
	}

	resyncPeriod, exists := f.customResync[informerType]
	if !exists {
		resyncPeriod = f.defaultResync
	}

	informer = newFunc(f.client, resyncPeriod)
	f.informers[informerType] = informer

	return informer
}

如果对应Type的Informer不存在,则会调用newFunc来创建Informer。在这里就是上面的defaultInformer。最终通过调用NewSharedIndexInformer创建sharedIndexInformer实体。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
	realClock := &clock.RealClock{}
	sharedIndexInformer := &sharedIndexInformer{
		processor:                       &sharedProcessor{clock: realClock},
		indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
		listerWatcher:                   lw,
		objectType:                      objType,
		resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
		defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
		cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
		clock:                           realClock,
	}
	return sharedIndexInformer
}

此时我们再细化分析一下sharedIndexInformer中的各个参数。processor会通过调用AddEventHandler函数来实现。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
	s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
}

func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
	s.startedLock.Lock()
	defer s.startedLock.Unlock()

	if s.stopped {
		klog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler)
		return
	}

	......

	listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)

	if !s.started {
		s.processor.addListener(listener)
		return
	}

	s.blockDeltas.Lock()
	defer s.blockDeltas.Unlock()

	s.processor.addListener(listener)
	for _, item := range s.indexer.List() {
		listener.add(addNotification{newObj: item})
	}
}

在newProcessListener中实例化processListener,并将listener添加到sharedProcessor的listeners和syncingListeners。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {
	ret := &processorListener{
		nextCh:                make(chan interface{}),
		addCh:                 make(chan interface{}),
		handler:               handler,
		pendingNotifications:  *buffer.NewRingGrowing(bufferSize),
		requestedResyncPeriod: requestedResyncPeriod,
		resyncPeriod:          resyncPeriod,
	}

	ret.determineNextResync(now)

	return ret
}

再分析Indexer结构,在初始化sharedIndexInformer结构时通过cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}初始化了indexers,sharedIndexInformer通过NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)构建对应的index。从实现看出底层的实现是实现了Indexer接口的cache。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
	return &cache{
		cacheStorage: NewThreadSafeStore(indexers, Indices{}),
		keyFunc:      keyFunc,
	}
}

func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
	return &threadSafeMap{
		items:    map[string]interface{}{},
		indexers: indexers,
		indices:  indices,
	}
}

// Index maps the indexed value to a set of keys in the store that match on that value
type Index map[string]sets.String

// Indexers maps a name to a IndexFunc
type Indexers map[string]IndexFunc

// Indices maps a name to an Index
type Indices map[string]Index

接下来具体看Start操作。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
	f.lock.Lock()
	defer f.lock.Unlock()

	for informerType, informer := range f.informers {
		if !f.startedInformers[informerType] {
			go informer.Run(stopCh)
			f.startedInformers[informerType] = true
		}
	}
}

这里通过sharedIndexInformer.Run启动整个过程。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
//初始化一个队列
	fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)

	cfg := &Config{
		Queue:            fifo,
		ListerWatcher:    s.listerWatcher,
		ObjectType:       s.objectType,
		FullResyncPeriod: s.resyncCheckPeriod,
		RetryOnError:     false,
		ShouldResync:     s.processor.shouldResync,

		Process: s.HandleDeltas,
	}

	func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()

		s.controller = New(cfg)
		s.controller.(*controller).clock = s.clock
		s.started = true
	}()

	// Separate stop channel because Processor should be stopped strictly after controller
	processorStopCh := make(chan struct{})
	var wg wait.Group
	defer wg.Wait()              // Wait for Processor to stop
	defer close(processorStopCh) // Tell Processor to stop
	wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
	wg.StartWithChannel(processorStopCh, s.processor.run)

	defer func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()
		s.stopped = true // Don't want any new listeners
	}()
	s.controller.Run(stopCh)
}

在Run中,先初始化了DeltaFIFO,

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
	f := &DeltaFIFO{
		items:        map[string]Deltas{},
		queue:        []string{},
		keyFunc:      keyFunc,
		knownObjects: knownObjects,
	}
	f.cond.L = &f.lock
	return f
}

初始化数据变化处理函数HandleDeltas

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
	s.blockDeltas.Lock()
	defer s.blockDeltas.Unlock()

	// from oldest to newest
	for _, d := range obj.(Deltas) {
		switch d.Type {
		case Sync, Added, Updated:
			isSync := d.Type == Sync
			s.cacheMutationDetector.AddObject(d.Object)
			if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
				if err := s.indexer.Update(d.Object); err != nil {
					return err
				}
				s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
			} else {
				if err := s.indexer.Add(d.Object); err != nil {
					return err
				}
				s.processor.distribute(addNotification{newObj: d.Object}, isSync)
			}
		case Deleted:
			if err := s.indexer.Delete(d.Object); err != nil {
				return err
			}
			s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
		}
	}
	return nil
}

然后再实例化controller,

1
2
3
4
5
6
7
8
// New makes a new Controller from the given Config.
func New(c *Config) Controller {
	ctlr := &controller{
		config: *c,
		clock:  &clock.RealClock{},
	}
	return ctlr
}

通过s.processor.run启动事件处理程序,在run函数中,将根据事件类型,从nextCh中获取通知,然后交给上面初始化的三个事件处理函数(AddEventHandler)进行处理。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
	func() {
		p.listenersLock.RLock()
		defer p.listenersLock.RUnlock()
		for _, listener := range p.listeners {
			p.wg.Start(listener.run)
			p.wg.Start(listener.pop)
		}
		p.listenersStarted = true
	}()
	<-stopCh
	p.listenersLock.RLock()
	defer p.listenersLock.RUnlock()
	for _, listener := range p.listeners {
		close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
	}
	p.wg.Wait() // Wait for all .pop() and .run() to stop
}

而上面的nextCh中的事件是通过processor.pop,从addCh将数据存放在netxCh中,具体后面再分析addCh中数据来源。 接下来看controller.Run函数,

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (c *controller) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	go func() {
		<-stopCh
		c.config.Queue.Close()
	}()
	r := NewReflector(
		c.config.ListerWatcher,
		c.config.ObjectType,
		c.config.Queue,
		c.config.FullResyncPeriod,
	)
	r.ShouldResync = c.config.ShouldResync
	r.clock = c.clock

	c.reflectorMutex.Lock()
	c.reflector = r
	c.reflectorMutex.Unlock()

	var wg wait.Group
	defer wg.Wait()

	wg.StartWithChannel(stopCh, r.Run)

	wait.Until(c.processLoop, time.Second, stopCh)
}

先初始化Reflector结构,

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
	return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
}
// NewNamedReflector same as NewReflector, but with a specified name for logging
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
	r := &Reflector{
		name:          name,
		listerWatcher: lw,
		store:         store,
		period:        time.Second,
		resyncPeriod:  resyncPeriod,
		clock:         &clock.RealClock{},
	}
	r.setExpectedType(expectedType)
	return r
}

再通过r.Run启动reflector,

1
2
3
4
5
6
7
8
func (r *Reflector) Run(stopCh <-chan struct{}) {
	klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
	wait.Until(func() {
		if err := r.ListAndWatch(stopCh); err != nil {
			utilruntime.HandleError(err)
		}
	}, r.period, stopCh)
}

然后再执行c.processLoop,在这个函数里完成从DeltaFIFO中pop实体出来处理,并将数据放入到上面的addCh

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func (c *controller) processLoop() {
	for {
		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
		if err != nil {
			if err == ErrFIFOClosed {
				return
			}
			if c.config.RetryOnError {
				// This is the safe way to re-enqueue.
				c.config.Queue.AddIfNotPresent(obj)
			}
		}
	}
}

而这里的实际处理函数是在Config中出初始化的的s.HandleDeltas

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
	s.blockDeltas.Lock()
	defer s.blockDeltas.Unlock()

	// from oldest to newest
	for _, d := range obj.(Deltas) {
		switch d.Type {
		case Sync, Added, Updated:
			isSync := d.Type == Sync
			s.cacheMutationDetector.AddObject(d.Object)
      //将数据缓存到indexer的缓存中(cache)
			if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
				if err := s.indexer.Update(d.Object); err != nil {
					return err
				}
				s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
			} else {
				if err := s.indexer.Add(d.Object); err != nil {
					return err
				}
				s.processor.distribute(addNotification{newObj: d.Object}, isSync)
			}
		case Deleted:
			if err := s.indexer.Delete(d.Object); err != nil {
				return err
			}
			s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
		}
	}
	return nil
}

先将数据通过s.indexer的相关接口缓存到cache中,然后通过s.processor.distribute将数据通过channel转到前面的Event处理函数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
	p.listenersLock.RLock()
	defer p.listenersLock.RUnlock()

	if sync {
		for _, listener := range p.syncingListeners {
			listener.add(obj)
		}
	} else {
		for _, listener := range p.listeners {
			listener.add(obj)
		}
	}
}
//
func (p *processorListener) add(notification interface{}) {
	p.addCh <- notification
}

Informer和kube-apiserver的交互都在这里实现,ListAndWatch函数还是有点长。在这里会先通过调用r.listerWatcher.List把所有数据都先list,然后把所有数据都先放入队列,设置resourceVersion。在ListAndWatch中还会起一个定时任务,到达resync的时间就会把所有的key都放入DeltaFIFO中执行resync服务。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
go func() {
  resyncCh, cleanup := r.resyncChan()
  defer func() {
    cleanup() // Call the last one written into cleanup
  }()
  for {
    select {
    case <-resyncCh:
    case <-stopCh:
      return
    case <-cancelCh:
      return
    }
    if r.ShouldResync == nil || r.ShouldResync() {
      klog.V(4).Infof("%s: forcing resync", r.name)
      if err := r.store.Resync(); err != nil {
        resyncerrc <- err
        return
      }
    }
    cleanup()
    resyncCh, cleanup = r.resyncChan()
  }
}()

最后会通过前面list是获取的resourceVersion启动监听。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
for {
  // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
  select {
  case <-stopCh:
    return nil
  default:
  }

  timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
  options = metav1.ListOptions{
    ResourceVersion: resourceVersion,
    // We want to avoid situations of hanging watchers. Stop any wachers that do not
    // receive any events within the timeout window.
    TimeoutSeconds: &timeoutSeconds,
    // To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
    // Reflector doesn't assume bookmarks are returned at all (if the server do not support
    // watch bookmarks, it will ignore this field).
    AllowWatchBookmarks: true,
  }

  // start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
  start := r.clock.Now()
  w, err := r.listerWatcher.Watch(options)
  if err != nil {
    switch err {
    case io.EOF:
      // watch closed normally
    case io.ErrUnexpectedEOF:
      klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err)
    default:
      utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err))
    }
    // If this is "connection refused" error, it means that most likely apiserver is not responsive.
    // It doesn't make sense to re-list all objects because most likely we will be able to restart
    // watch where we ended.
    // If that's the case wait and resend watch request.
    if utilnet.IsConnectionRefused(err) {
      time.Sleep(time.Second)
      continue
    }
    return nil
  }

  if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
    if err != errorStopRequested {
      switch {
      case apierrs.IsResourceExpired(err):
        klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
      default:
        klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
      }
    }
    return nil
  }
}

最后的实现函数r.watchHandler,在这里根据时间的类型把watch资源添加到DeltaFIFO,并通过前面的Pop函数进行消费。

Indexer

在这里Index存储接口的具体实现是cache。

1
2
3
4
5
6
7
8
type cache struct {
	// cacheStorage bears the burden of thread safety for the cache
	cacheStorage ThreadSafeStore
	// keyFunc is used to make the key for objects stored in and retrieved from items, and
	// should be deterministic.
	//这个keyFunc是后面用来获取对应的obj对象的,默认是namespace/name
	keyFunc KeyFunc
}

Index接口

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
type Indexer interface {
	Store
	// Index returns the stored objects whose set of indexed values
	// intersects the set of indexed values of the given object, for
	// the named index
	Index(indexName string, obj interface{}) ([]interface{}, error)
	// IndexKeys returns the storage keys of the stored objects whose
	// set of indexed values for the named index includes the given
	// indexed value
	IndexKeys(indexName, indexedValue string) ([]string, error)
	// ListIndexFuncValues returns all the indexed values of the given index
	ListIndexFuncValues(indexName string) []string
	// ByIndex returns the stored objects whose set of indexed values
	// for the named index includes the given indexed value
	ByIndex(indexName, indexedValue string) ([]interface{}, error)
	// GetIndexer return the indexers
	GetIndexers() Indexers

	// AddIndexers adds more indexers to this store.  If you call this after you already have data
	// in the store, the results are undefined.
	AddIndexers(newIndexers Indexers) error
}

cache中的cacheStorage则是通过threadSafeMap实现,

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
type threadSafeMap struct {
	lock  sync.RWMutex
	items map[string]interface{}

	// indexers maps a name to an IndexFunc
	indexers Indexers
	// indices maps a name to an Index
	indices Indices
}

type Index map[string]sets.String

// Indexers maps a name to a IndexFunc
type Indexers map[string]IndexFunc

// Indices maps a name to an Index
type Indices map[string]Index

默认的Indexers是cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}。其中key是namespace,value是获取object对象namespace的函数。 此时通过一个对象更新操作分析这些结构的关系。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//先遍历indexers中的索引函数,默认是namespace
for name, indexFunc := range c.indexers {
//由indexFunc获取需要操作的对象namespace
		indexValues, err := indexFunc(newObj)
		if err != nil {
			panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
		}
//从indices中获取对应indexFunc的所有索引,默认是namespace,如果不存在就创建。
		index := c.indices[name]
		if index == nil {
			index = Index{}
			c.indices[name] = index
		}
//通过namespace索引获取到Indexers结构,这个结构里存放的是每个namespace的所有key,这里的key默认都是namespace/name结构,如果不存在则把这个key添加到Indexers对应的namespace中。
		for _, indexValue := range indexValues {
			set := index[indexValue]
			if set == nil {
				set = sets.String{}
				index[indexValue] = set
			}
			set.Insert(key)
		}
	}

indexers中存的是获取索引的函数,map[name]IndexFunc形式。 Indices是存的obj对应的索引。形式是map[string]Index,这个map的key和indexers中的是一一对应。obj根据IndexFunc计算出来的索引去Index中查存储在items中的key,默认是namespace/objname的形式,如果没有namespace则前面是空。然后再拿key去items中查对应的值。

Index

到这里我们上面所有的流程的基本已经通了。还有一些细节可能需要各位自行补充。