Client,err:=kubernetes.NewForConfig(cfg)iferr!=nil{klog.Fatalf("Error building kubernetes clientset: %s",err.Error())}kubeInformerFactory:=kubeinformers.NewSharedInformerFactory(Client,time.Second*30)controller:=NewController(Client,kubeInformerFactory.Apps().V1().Deployments())kubeInformerFactory.Start(stopCh)
deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc:controller.handleObject,UpdateFunc:func(old,newinterface{}){newDepl:=new.(*appsv1.Deployment)oldDepl:=old.(*appsv1.Deployment)ifnewDepl.ResourceVersion==oldDepl.ResourceVersion{// Periodic resync will send update events for all known Deployments.
// Two different versions of the same Deployment will always have different RVs.
return}controller.handleObject(new)},DeleteFunc:controller.handleObject,})controller:=&Controller{clinetSet:Client,deploymentsLister:deploymentInformer.Lister(),deploymentsSynced:deploymentInformer.Informer().HasSynced,workQueue:workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(),"")}
func(s*sharedIndexInformer)AddEventHandler(handlerResourceEventHandler){s.AddEventHandlerWithResyncPeriod(handler,s.defaultEventHandlerResyncPeriod)}func(s*sharedIndexInformer)AddEventHandlerWithResyncPeriod(handlerResourceEventHandler,resyncPeriodtime.Duration){s.startedLock.Lock()defers.startedLock.Unlock()ifs.stopped{klog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already",handler)return}......listener:=newProcessListener(handler,resyncPeriod,determineResyncPeriod(resyncPeriod,s.resyncCheckPeriod),s.clock.Now(),initialBufferSize)if!s.started{s.processor.addListener(listener)return}s.blockDeltas.Lock()defers.blockDeltas.Unlock()s.processor.addListener(listener)for_,item:=ranges.indexer.List(){listener.add(addNotification{newObj:item})}}
funcNewIndexer(keyFuncKeyFunc,indexersIndexers)Indexer{return&cache{cacheStorage:NewThreadSafeStore(indexers,Indices{}),keyFunc:keyFunc,}}funcNewThreadSafeStore(indexersIndexers,indicesIndices)ThreadSafeStore{return&threadSafeMap{items:map[string]interface{}{},indexers:indexers,indices:indices,}}// Index maps the indexed value to a set of keys in the store that match on that value
typeIndexmap[string]sets.String// Indexers maps a name to a IndexFunc
typeIndexersmap[string]IndexFunc// Indices maps a name to an Index
typeIndicesmap[string]Index
func(s*sharedIndexInformer)Run(stopCh<-chanstruct{}){deferutilruntime.HandleCrash()//初始化一个队列
fifo:=NewDeltaFIFO(MetaNamespaceKeyFunc,s.indexer)cfg:=&Config{Queue:fifo,ListerWatcher:s.listerWatcher,ObjectType:s.objectType,FullResyncPeriod:s.resyncCheckPeriod,RetryOnError:false,ShouldResync:s.processor.shouldResync,Process:s.HandleDeltas,}func(){s.startedLock.Lock()defers.startedLock.Unlock()s.controller=New(cfg)s.controller.(*controller).clock=s.clocks.started=true}()// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh:=make(chanstruct{})varwgwait.Groupdeferwg.Wait()// Wait for Processor to stop
deferclose(processorStopCh)// Tell Processor to stop
wg.StartWithChannel(processorStopCh,s.cacheMutationDetector.Run)wg.StartWithChannel(processorStopCh,s.processor.run)deferfunc(){s.startedLock.Lock()defers.startedLock.Unlock()s.stopped=true// Don't want any new listeners
}()s.controller.Run(stopCh)}
func(s*sharedIndexInformer)HandleDeltas(objinterface{})error{s.blockDeltas.Lock()defers.blockDeltas.Unlock()// from oldest to newest
for_,d:=rangeobj.(Deltas){switchd.Type{caseSync,Added,Updated:isSync:=d.Type==Syncs.cacheMutationDetector.AddObject(d.Object)ifold,exists,err:=s.indexer.Get(d.Object);err==nil&&exists{iferr:=s.indexer.Update(d.Object);err!=nil{returnerr}s.processor.distribute(updateNotification{oldObj:old,newObj:d.Object},isSync)}else{iferr:=s.indexer.Add(d.Object);err!=nil{returnerr}s.processor.distribute(addNotification{newObj:d.Object},isSync)}caseDeleted:iferr:=s.indexer.Delete(d.Object);err!=nil{returnerr}s.processor.distribute(deleteNotification{oldObj:d.Object},false)}}returnnil}
然后再实例化controller,
1
2
3
4
5
6
7
8
// New makes a new Controller from the given Config.
funcNew(c*Config)Controller{ctlr:=&controller{config:*c,clock:&clock.RealClock{},}returnctlr}
func(p*sharedProcessor)run(stopCh<-chanstruct{}){func(){p.listenersLock.RLock()deferp.listenersLock.RUnlock()for_,listener:=rangep.listeners{p.wg.Start(listener.run)p.wg.Start(listener.pop)}p.listenersStarted=true}()<-stopChp.listenersLock.RLock()deferp.listenersLock.RUnlock()for_,listener:=rangep.listeners{close(listener.addCh)// Tell .pop() to stop. .pop() will tell .run() to stop
}p.wg.Wait()// Wait for all .pop() and .run() to stop
}
funcNewReflector(lwListerWatcher,expectedTypeinterface{},storeStore,resyncPeriodtime.Duration)*Reflector{returnNewNamedReflector(naming.GetNameFromCallsite(internalPackages...),lw,expectedType,store,resyncPeriod)}// NewNamedReflector same as NewReflector, but with a specified name for logging
funcNewNamedReflector(namestring,lwListerWatcher,expectedTypeinterface{},storeStore,resyncPeriodtime.Duration)*Reflector{r:=&Reflector{name:name,listerWatcher:lw,store:store,period:time.Second,resyncPeriod:resyncPeriod,clock:&clock.RealClock{},}r.setExpectedType(expectedType)returnr}
再通过r.Run启动reflector,
1
2
3
4
5
6
7
8
func(r*Reflector)Run(stopCh<-chanstruct{}){klog.V(3).Infof("Starting reflector %v (%s) from %s",r.expectedTypeName,r.resyncPeriod,r.name)wait.Until(func(){iferr:=r.ListAndWatch(stopCh);err!=nil{utilruntime.HandleError(err)}},r.period,stopCh)}
func(c*controller)processLoop(){for{obj,err:=c.config.Queue.Pop(PopProcessFunc(c.config.Process))iferr!=nil{iferr==ErrFIFOClosed{return}ifc.config.RetryOnError{// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)}}}}
func(s*sharedIndexInformer)HandleDeltas(objinterface{})error{s.blockDeltas.Lock()defers.blockDeltas.Unlock()// from oldest to newest
for_,d:=rangeobj.(Deltas){switchd.Type{caseSync,Added,Updated:isSync:=d.Type==Syncs.cacheMutationDetector.AddObject(d.Object)//将数据缓存到indexer的缓存中(cache)
ifold,exists,err:=s.indexer.Get(d.Object);err==nil&&exists{iferr:=s.indexer.Update(d.Object);err!=nil{returnerr}s.processor.distribute(updateNotification{oldObj:old,newObj:d.Object},isSync)}else{iferr:=s.indexer.Add(d.Object);err!=nil{returnerr}s.processor.distribute(addNotification{newObj:d.Object},isSync)}caseDeleted:iferr:=s.indexer.Delete(d.Object);err!=nil{returnerr}s.processor.distribute(deleteNotification{oldObj:d.Object},false)}}returnnil}
gofunc(){resyncCh,cleanup:=r.resyncChan()deferfunc(){cleanup()// Call the last one written into cleanup
}()for{select{case<-resyncCh:case<-stopCh:returncase<-cancelCh:return}ifr.ShouldResync==nil||r.ShouldResync(){klog.V(4).Infof("%s: forcing resync",r.name)iferr:=r.store.Resync();err!=nil{resyncerrc<-errreturn}}cleanup()resyncCh,cleanup=r.resyncChan()}}()
for{// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
select{case<-stopCh:returnnildefault:}timeoutSeconds:=int64(minWatchTimeout.Seconds()*(rand.Float64()+1.0))options=metav1.ListOptions{ResourceVersion:resourceVersion,// We want to avoid situations of hanging watchers. Stop any wachers that do not
// receive any events within the timeout window.
TimeoutSeconds:&timeoutSeconds,// To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
// Reflector doesn't assume bookmarks are returned at all (if the server do not support
// watch bookmarks, it will ignore this field).
AllowWatchBookmarks:true,}// start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
start:=r.clock.Now()w,err:=r.listerWatcher.Watch(options)iferr!=nil{switcherr{caseio.EOF:// watch closed normally
caseio.ErrUnexpectedEOF:klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v",r.name,r.expectedTypeName,err)default:utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v",r.name,r.expectedTypeName,err))}// If this is "connection refused" error, it means that most likely apiserver is not responsive.
// It doesn't make sense to re-list all objects because most likely we will be able to restart
// watch where we ended.
// If that's the case wait and resend watch request.
ifutilnet.IsConnectionRefused(err){time.Sleep(time.Second)continue}returnnil}iferr:=r.watchHandler(start,w,&resourceVersion,resyncerrc,stopCh);err!=nil{iferr!=errorStopRequested{switch{caseapierrs.IsResourceExpired(err):klog.V(4).Infof("%s: watch of %v ended with: %v",r.name,r.expectedTypeName,err)default:klog.Warningf("%s: watch of %v ended with: %v",r.name,r.expectedTypeName,err)}}returnnil}}
typecachestruct{// cacheStorage bears the burden of thread safety for the cache
cacheStorageThreadSafeStore// keyFunc is used to make the key for objects stored in and retrieved from items, and
// should be deterministic.
//这个keyFunc是后面用来获取对应的obj对象的,默认是namespace/name
keyFuncKeyFunc}
typeIndexerinterface{Store// Index returns the stored objects whose set of indexed values
// intersects the set of indexed values of the given object, for
// the named index
Index(indexNamestring,objinterface{})([]interface{},error)// IndexKeys returns the storage keys of the stored objects whose
// set of indexed values for the named index includes the given
// indexed value
IndexKeys(indexName,indexedValuestring)([]string,error)// ListIndexFuncValues returns all the indexed values of the given index
ListIndexFuncValues(indexNamestring)[]string// ByIndex returns the stored objects whose set of indexed values
// for the named index includes the given indexed value
ByIndex(indexName,indexedValuestring)([]interface{},error)// GetIndexer return the indexers
GetIndexers()Indexers// AddIndexers adds more indexers to this store. If you call this after you already have data
// in the store, the results are undefined.
AddIndexers(newIndexersIndexers)error}
cache中的cacheStorage则是通过threadSafeMap实现,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
typethreadSafeMapstruct{locksync.RWMutexitemsmap[string]interface{}// indexers maps a name to an IndexFunc
indexersIndexers// indices maps a name to an Index
indicesIndices}typeIndexmap[string]sets.String// Indexers maps a name to a IndexFunc
typeIndexersmap[string]IndexFunc// Indices maps a name to an Index
typeIndicesmap[string]Index
//先遍历indexers中的索引函数,默认是namespace
forname,indexFunc:=rangec.indexers{//由indexFunc获取需要操作的对象namespace
indexValues,err:=indexFunc(newObj)iferr!=nil{panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v",key,name,err))}//从indices中获取对应indexFunc的所有索引,默认是namespace,如果不存在就创建。
index:=c.indices[name]ifindex==nil{index=Index{}c.indices[name]=index}//通过namespace索引获取到Indexers结构,这个结构里存放的是每个namespace的所有key,这里的key默认都是namespace/name结构,如果不存在则把这个key添加到Indexers对应的namespace中。
for_,indexValue:=rangeindexValues{set:=index[indexValue]ifset==nil{set=sets.String{}index[indexValue]=set}set.Insert(key)}}