当前位置:首页 > 数码 > 基本原理揭秘-Informer-深化了解-Kubernetes (基本原理的作用)

基本原理揭秘-Informer-深化了解-Kubernetes (基本原理的作用)

admin4周前 (04-26)数码19

本文剖析k8scontroller中informer启动的基本流程

不论是k8s自身组件,还是自己编写controller,都须要经过apiserver监听etcd事情来成功自己的控制循环逻辑。

如何高效牢靠启动事情监听,k8s客户端工具包client-go提供了一个通用的informer包,经过informer,可以繁难和高效的启动controller开发。

informer包提供了如下的一些性能:

1、本地缓存(store)

2、索引机制(indexer)

3、Handler注册性能(eventHandler)

1、informer架构

整个informer机制架构如下图(图片源自Client-go):

图片

可以看到这张图分为高低两个局部,上半局部由client-go提供,下半局部则是须要自己成功的控制循环逻辑

本文关键剖析上半局部的逻辑,包含上方几个组件:

1.1、Reflector:

从图上可以看到Reflector是一个和apiserver交互的组件,经过list和watchapi将资源对象压入队列

1.2、DeltaFifo:

DeltaFifo的结构体表示如下:

typeDeltaFIFOstruct{...//Wedependonthepropertythatitemsinthesetarein//thequeueandviceversa,andthatallDeltasinthis//maphaveatleastoneDelta.itemsmap[string]Deltasqueue[]string...}

关键分为两局部,fifo和delta

(1)fifo:先进先出队列

对应结构体中的queue,结构体示例如下:

[default/-fd77b5886-pfrgn,xxx,xxx]

(2)delta:对应结构体中的items,存储了资源对象并且携带了资源操作类型的一个map,结构体示例如下:

map:{"default/centos-fd77b5886-pfrgn":[{Replaced&Pod{ObjectMeta:${pod参数}],"xxx":[{},{}]}

消费者从queue中pop出对象启动消费,并从items失掉详细的消费操作(执执行作Update/Deleted/Sync,和执行的对象objectspec)

1.3、Indexer:

client-go用来存储资源对象并自带索引性能的本地存储,deltaFIFO中pop出的对象将存储到Indexer。

indexer与etcd集群中的数据坚持分歧,从而client-go可以间接从本地缓存失掉资源对象,缩小apiserver和etcd集群的压力。

2、一个基本例子

funcmn(){stopCh:=make(chanstruct{})deferclose(stopCh)//(1)Newak8sclientsetmasterUrl:="172.27.32.110:8080"config,err:=clientcmd.BuildConfigFromFlags(masterUrl,"")iferr!=nil{klog.Errorf("BuildConfigFromFlagserr,err:%v",err)}clientset,err:=k.NewForConfig(config)iferr!=nil{klog.Errorf("Getclientseterr,err:%v",err)}//(2)NewasharedInformersfactorysharedInformers:=informers.NewSharedInformerFactory(clientset,defaultResync)//(3)Registerainformer//f.informers[informerType]=informer,//thedetailforinformerisbuildinNewFilteredPodInformer()podInformer:=sharedInformers.Core().V1().Pods().Informer()//(4)RegistereventhandlerpodInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc:func(objinterface{}){mObj:=obj.(v1.Object)klog.Infof("Getnewobj:%v",mObj)klog.Infof("Getnewobjname:%s",mObj.GetName())},})//(5)StartallinformerssharedInformers.Start(stopCh)//(6)Acronjobforcachesyncif!cache.WaitForCacheSync(stopCh,podInformer.HasSynced){klog.Infof("Cachesyncfail!")}//(7)UselisterpodLister:=sharedInformers.Core().V1().Pods().Lister()pods,err:=podLister.List(labels.Everything())iferr!=nil{klog.Infof("err:%v",err)}klog.Infof("len(pods),%d",len(pods))for_,v:=rangepods{klog.Infof("pod:%s",v.Name)}<-stopChan}

上方就是一个繁难的informer的经常使用例子,整个环节如上述几个步骤,着重说一下(2)、(3)、(4)、(5)四个步骤

3、流程剖析

3.1、NewasharedInformersfactory

sharedInformers:=informers.NewSharedInformerFactory(clientset,defaultResync)factory:=&sharedInformerFactory{client:client,namespace:v1.NamespaceAll,defaultResync:defaultResync,informers:make(map[reflect.Type]cache.SharedIndexInformer),startedInformers:make(map[reflect.Type]bool),customResync:make(map[reflect.Type]time.Duration),}

这个环节就是创立一个informer的工厂sharedInformerFactory,sharedInformerFactory中有一个informers对象,外面是一个informer的map,sharedInformerFactory是为了防止过多的重复informer监听apiserver,造成apiserver压力过大,在同一个服务中,不同的controller经常使用同一个informer

3.2、Registerainformer

这个环节关键是生成和注册informer到sharedInformerFactory

podInformer:=sharedInformers.Core().V1().Pods().Informer()func(f*podInformer)Informer()cache.SharedIndexInformer{returnf.factory.InformerFor(&corev1.Pod{},f.defaultInformer)}###f.factory.InformerFor:###注册informerfunc(f*sharedInformerFactory)InformerFor(objruntime.Object,newFuncinternalinterfaces.NewInformerFunc)cache.SharedIndexInformer{...informer=newFunc(f.client,resyncPeriod)f.informers[informerType]=informerreturninformer}###f.defaultInformer:###生成informerfunc(f*podInformer)defaultInformer(clientk.Interface,resyncPeriodtime.Duration)cache.SharedIndexInformer{returnNewFilteredPodInformer(client,f.namespace,resyncPeriod,cache.Indexers{cache.NamespaceIndex:cache.MetaNamespaceIndexFunc},f.tweakListOptions)}funcNewFilteredPodInformer(clientk.Interface,namespacestring,resyncPeriodtime.Duration,indexerscache.Indexers,tweakListOptionsinternalinterfaces.TweakListOptionsFunc)cache.SharedIndexInformer{returncache.NewSharedIndexInformer(&cache.ListWatch{ListFunc:func(optionsmetav1.ListOptions)(runtime.Object,error){iftweakListOptions!=nil{tweakListOptions(&options)}returnclient.CoreV1().Pods(namespace).List(context.TODO(),options)},WatchFunc:func(optionsmetav1.ListOptions)(watch.Interface,error){iftweakListOptions!=nil{tweakListOptions(&options)}returnclient.CoreV1().Pods(namespace).Watch(context.TODO(),options)},},&corev1.Pod{},resyncPeriod,indexers,)}###cache.NewSharedIndexInformer:funcNewSharedIndexInformer(lwListerWatcher,exampleObjectruntime.Object,defaultEventHandlerResyncPeriodtime.Duration,indexersIndexers)SharedIndexInformer{realClock:=&clock.RealClock{}sharedIndexInformer:=&sharedIndexInformer{processor:&sharedProcessor{clock:realClock},indexer:NewIndexer(DeletionHandlingMetaNamespaceKeyFunc,indexers),listerWatcher:lw,objectType:exampleObject,resyncCheckPeriod:defaultEventHandlerResyncPeriod,defaultEventHandlerResyncPeriod:defaultEventHandlerResyncPeriod,cacheMutationDetector:NewCacheMutationDetector(fmt.Sprintf("%T",exampleObject)),clock:realClock,}returnsharedIndexInformer}

首先经过f.defaultInformer方法生成informer,而后经过f.factory.InformerFor方法,将informer注册到sharedInformerFactory

3.3、Registereventhandler

这个环节展现如何注册一个回调函数,以及如何触发这个回调函数

###podInformer.AddEventHandler:func(s*sharedIndexInformer)AddEventHandler(handlerResourceEventHandler){s.AddEventHandlerWithResyncPeriod(handler,s.defaultEventHandlerResyncPeriod)}func(s*sharedIndexInformer)AddEventHandlerWithResyncPeriod(handlerResourceEventHandler,resyncPeriodtime.Duration){...listener:=newProcessListener(handler,resyncPeriod,determineResyncPeriod(resyncPeriod,s.resyncCheckPeriod),s.clock.Now(),initialBufferSize)if!s.started{s.processor.addListener(listener)return}...}###s.processor.addListener(listener):func(p*sharedProcessor)addListener(listener*processorListener){p.addListenerLocked(listener)ifp.listenersStarted{p.wg.Start(listener.run)p.wg.Start(listener.pop)}}###listener.run:func(p*processorListener)run(){//thiscallblocksuntilthechannelisclosed.Whenapanichensduringthenotification//wewillcatchit,**theoffendingitemwillbeskipped!**,andafterashortdelay(onesecond)//thenextnotificationwillbeattempted.Thisisusuallybetterthanthealternativeofnever//deliveringagain.stopCh:=make(chanstruct{})wait.Until(func(){fornext:=rangep.nextCh{switchnotification:=next.(type){//经过next结构体自身的类型来判别事情类型caseupdateNotification:p.handler.OnUpdate(notification.oldObj,notification.newObj)caseaddNotification:p.handler.OnAdd(notification.newObj)casedeleteNotification:p.handler.OnDelete(notification.oldObj)default:utilruntime.HandleError(fmt.Errorf("unrecognizednotification:%T",next))}}//theonlywaytogethereisifthep.nextChisemptyandclosedclose(stopCh)},1*time.Second,stopCh)}###listener.pop:func(p*processorListener)pop(){varnextChchan<-interface{}varnotificationinterface{}for{select{casenextCh<-notification://Notificationdispatchedvarokboolnotification,ok=p.pendingNotifications.ReadOne()if!ok{//NothingtopopnextCh=nil//Disablethisselectcase}casenotificationToAdd,ok:=<-p.addCh:if!ok{return}ifnotification==nil{//Nonotificationtopop(andpendingNotificationsisempty)//Optimizethecase-skipaddingtopendingNotificationsnotification=notificationToAddnextCh=p.nextCh}else{//Thereisalreadyanotificationwaitingtobedispatchedp.pendingNotifications.WriteOne(notificationToAdd)}}}}

这个环节总结就是:

(1)AddEventHandler到sharedProcessor,注册事情回调函数到sharedProcessor

(2)listenerpop方法里会监听p.addCh,经过nextCh=p.nextCh将addCh将事情传递给p.nextCh

(3)listenerrun方法里会监听p.nextCh,收到信号之后,判别是属于什么类型的方法,并且执行前面注册的Handler

所以后面须要关注当资源对象出现变卦时,是如何将变卦信号给p.addCh,进一步触发回调函数的

Kubernetes

3.4、Startallinformers

经过sharedInformers.Start(stopCh)启动一切的informer,代码如下:

//Startinitializesallrequestedinformers.func(f*sharedInformerFactory)Start(stopCh<-chanstruct{}){forinformerType,informer:=rangef.informers{if!f.startedInformers[informerType]{goinformer.Run(stopCh)f.startedInformers[informerType]=true}}}

咱们的例子中其实就只启动了PodInformer,接上去看到podInformer的Run方法做了什么

###goinformer.Run(stopCh):func(s*sharedIndexInformer)Run(stopCh<-chanstruct{}){deferutilruntime.HandleCrash()fifo:=NewDeltaFIFOWithOptions(DeltaFIFOOptions{//DeltafifoKnownObjects:s.indexer,EmitDeltaTypeReplaced:true,})cfg:=&Config{Queue:fifo,//DeltafifoListerWatcher:s.listerWatcher,//listerWatcherObjectType:s.objectType,FullResyncPeriod:s.resyncCheckPeriod,RetryOnError:false,ShouldResync:s.processor.shouldResync,//HandleDeltas,addedtoprocess,anddoneinprocessloopProcess:s.HandleDeltas,WatchErrorHandler:s.watchErrorHandler,}func(){...s.controller=New(cfg)...}s.controller.Run(stopCh)}###s.controller.Run(stopCh)func(c*controller)Run(stopCh<-chanstruct{}){r:=NewReflector(c.config.ListerWatcher,c.config.ObjectType,c.config.Queue,c.config.FullResyncPeriod,)c.reflector=r//Runreflectorwg.StartWithChannel(stopCh,r.Run)//RunprocessLoop,popfromdeltafifoanddoProcessFunc,//ProcessFuncisthes.HandleDeltasbeforewait.Until(c.processLoop,time.Second,stopCh)}

可以看到上方的逻辑首先生成一个DeltaFifo,而后接上去的逻辑分为两块,消费和消费:

(1)消费—r.Run:

关键的逻辑就是应用listandwatch将资源对象包含操作类型压入队列DeltaFifo

####r.Run:func(r*Reflector)Run(stopCh<-chanstruct{}){//执行listAndWatchiferr:=r.ListAndWatch(stopCh);}//执行ListAndWatch流程func(r*Reflector)ListAndWatch(stopCh<-chanstruct{})error{//1、list://(1)、listpods,实践调用的是podInformer里的ListFunc方法,//client.CoreV1().Pods(namespace).List(context.TODO(),options)r.listerWatcher.List(opts)//(2)、失掉资源版本号,用于watchresourceVersion=listMetaInterface.GetResourceVersion()//(3)、数据转换,转换成列表items,err:=meta.ExtractList(list)//(4)、将资源列表中的资源对象和版本号存储到DeltaFifo中r.syncWith(items,resourceVersion);//2、watch,有限循环去watchapiserver,当watch到事情的时刻,执行watchHandler将event事情压入fifofor{//(1)、watchpods,实践调用的是podInformer里的WatchFunc方法,//client.CoreV1().Pods(namespace).Watch(context.TODO(),options)w,err:=r.listerWatcher.Watch(options)//(2)、watchHandler//watchHandlerwatchespod,降级DeltaFifo消息,并且降级resourceVersioniferr:=r.watchHandler(start,w,&resourceVersion,resyncerrc,stopCh);}}###r.watchHandler//watchHandlerwatcheswandkeeps*resourceVersionuptodate.func(r*Reflector)watchHandler(starttime.Time,wwatch.Interface,resourceVersion*string,errcchanerror,stopCh<-chanstruct{})error{...loop:for{select{caseevent,ok:=<-w.ResultChan():newResourceVersion:=meta.GetResourceVersion()switchevent.Type{casewatch.Added:err:=r.store.Add(event.Object)//Addeventtosrore,store的详细方法在fifo中iferr!=nil{utilruntime.HandleError(fmt.Errorf("%s:unabletoaddwatcheventobject(%#v)tostore:%v",r.name,event.Object,err))}...}*resourceVersion=newResourceVersionr.setLastSyncResourceVersion(newResourceVersion)eventCount++}}...}###r.store.Add:##即为deltaFifo的add方法:func(f*DeltaFIFO)Add(objinterface{})error{...returnf.queueActionLocked(Added,obj)...}func(f*DeltaFIFO)queueActionLocked(actionTypeDeltaType,objinterface{})error{id,err:=f.KeyOf(obj)iferr!=nil{returnKeyError{obj,err}}newDeltas:=append(f.items[id],Delta{actionType,obj})newDeltas=dedupDeltas(newDeltas)iflen(newDeltas)>0{if_,exists:=f.items[id];!exists{f.queue=append(f.queue,id)}f.items[id]=newDeltasf.cond.Broadcast()//通知一切阻塞住的消费者}...returnnil}
(2)消费—c.processLoop:

消费逻辑就是从DeltaFifopop出对象,而后做两件事情:(1)触发前面注册的eventhandler(2)降级本地索引缓存indexer,坚持数据和etcd分歧

func(c*controller)processLoop(){for{obj,err:=c.config.Queue.Pop(PopProcessFunc(c.config.Process))}}###Queue.Pop:##Queue.Pop是一个带有处置函数的pod方法,首先先看Pod逻辑,即为deltaFifo的pop方法:func(f*DeltaFIFO)Pop(processPopProcessFunc)(interface{},error){for{//有限循环forlen(f.queue)==0{f.cond.Wait()//阻塞直到消费端broadcast方法通知}id:=f.queue[0]item,ok:=f.items[id]delete(f.items,id)err:=process(item)//执行处置方法ife,ok:=err.(ErrRequeue);ok{f.addIfNotPresent(id,item)//假设处置失败的从新添加到fifo中从新处置err=e.Err}returnitem,err}}###c.config.Process:##c.config.Process是在初始化controller的时刻赋值的,即为前面的s.HandleDeltas###s.HandleDeltas:func(s*sharedIndexInformer)HandleDeltas(objinterface{})error{s.blockDeltas.Lock()defers.blockDeltas.Unlock()//fromoldesttonewestfor_,d:=rangeobj.(Deltas){switchd.Type{caseSync,Replaced,Added,Updated:s.cacheMutationDetector.AddObject(d.Object)ifold,exists,err:=s.indexer.Get(d.Object);err==nil&&exists{iferr:=s.indexer.Update(d.Object);err!=nil{returnerr}isSync:=falseswitch{cased.Type==Sync://SynceventsareonlypropagatedtolistenersthatrequestedresyncisSync=truecased.Type==Replaced:ifaccessor,err:=meta.Accessor(d.Object);err==nil{ifoldAccessor,err:=meta.Accessor(old);err==nil{//Replacedeventsthatdidn'tchangeresourceVersionaretreatedasresyncevents//andonlypropagatedtolistenersthatrequestedresyncisSync=accessor.GetResourceVersion()==oldAccessor.GetResourceVersion()}}}s.processor.distribute(updateNotification{oldObj:old,newObj:d.Object},isSync)}else{iferr:=s.indexer.Add(d.Object);err!=nil{returnerr}s.processor.distribute(addNotification{newObj:d.Object},false)}caseDeleted:iferr:=s.indexer.Delete(d.Object);err!=nil{returnerr}s.processor.distribute(deleteNotification{oldObj:d.Object},false)}}returnnil}

可以看到上方关键执行两局部逻辑:

s.processor.distribute
####s.processor.distribute:###例如新增通知:s.processor.distribute(addNotification{newObj:d.Object},false)###其中addNotification就是add类型的通知,前面会经过notification结构体的类型来执行不同的eventHandlerfunc(p*sharedProcessor)distribute(objinterface{},syncbool){p.listenersLock.RLock()deferp.listenersLock.RUnlock()ifsync{for_,listener:=rangep.syncingListeners{listener.add(obj)}}else{for_,listener:=rangep.listeners{listener.add(obj)}}}func(p*processorListener)add(notificationinterface{}){p.addCh<-notification//新增notification到addCh}

这里p.addCh对应到前面说的关注对象p.addCh,processorListener收到addCh信号之后传递给nextCh,而后经过notification结构体的类型来执行不同的eventHandler

s.indexer的增删改:

这个就是本地数据的缓存和索引,自定义控制逻辑外面会经过indexer失掉操作对象的详细参数,这里就不开展细讲了。

4、总结

至此一个informer的client-go局部的流程就走完了,可以看到启动informer关键流程就是:

1、ReflectorListAndWatch:

(1)经过一个reflectorrun起来一个带有list和watchapi的client

(2)list到的pod列表经过DeltaFifo存储,并降级最新的ResourceVersion

(3)继续监听pod,监听到的pod操作事情继续存储到DeltaFifo中

2、DeltaFifo消费和消费:

(1)消费:listandwatch到的事情消费压入队列DeltaFifo

(2)消费:执行注册的eventHandler,并降级本地indexer

所以informer实质其实就是一个经过deltaFifo建设消费消费机制,并且带有本地缓存和索引,以及可以注册回调事情的apiServer的客户端库。

5、参考

免责声明:本文转载或采集自网络,版权归原作者所有。本网站刊发此文旨在传递更多信息,并不代表本网赞同其观点和对其真实性负责。如涉及版权、内容等问题,请联系本网,我们将在第一时间删除。同时,本网站不对所刊发内容的准确性、真实性、完整性、及时性、原创性等进行保证,请读者仅作参考,并请自行核实相关内容。对于因使用或依赖本文内容所产生的任何直接或间接损失,本网站不承担任何责任。

标签: Kubernetes

“基本原理揭秘-Informer-深化了解-Kubernetes (基本原理的作用)” 的相关文章

Kubernetes-集群的十年历程-管理-踩过的十个大坑 (kubernetes)

Kubernetes-集群的十年历程-管理-踩过的十个大坑 (kubernetes)

Kubernetes是容器技术的绝对王者,它允许我们在YAML文件中描述应用程序的外观,然后Kubernetes会完成其余的工作。 高效管理Kubernetes集群至关重要。本文总结了管理K...

Kubernetes-Kubernetes-深化了解-中的网络原理和最佳通常-网络模型综合指南 (kubernetes与docker的关系)

Kubernetes-Kubernetes-深化了解-中的网络原理和最佳通常-网络模型综合指南 (kubernetes与docker的关系)

这篇详细的博文讨论了Kubees网络的复杂性,提供了关于如何在容器化环境中确保高效和安保通讯的见地。 译自NavigatingtheNetwork:AComprehensiveGuideto...

分步实现指南-基于Kubernetes构建Nacos高可用集群 (分步实施的步骤)

分步实现指南-基于Kubernetes构建Nacos高可用集群 (分步实施的步骤)

前提条件 安装并配置 Kubernetes 集群。 准备持久化存储(如 NFS、PV 等)用于保存 Nacos 数据。 修改 Nacos 配置 按照以下步骤...

b-b-href=-a-a-开发者Kubernetes懒人指南 (b-b-href=-a-a-开发者Kubernetes懒人指南)

b-b-href=-a-a-开发者Kubernetes懒人指南 (b-b-href=-a-a-开发者Kubernetes懒人指南)

你可以将本文作为开发者极速了解Kubees的指南。从基础常识到更初级的主题,如HelmChart,以及一切这些如何影响你作为开发者。 译自KubernetesforLazyDeveloper...

优秀Kubernetes工具的最终指南 (优秀库)

优秀Kubernetes工具的最终指南 (优秀库)

引言 Kubernetes 是用于管理容器化应用程序编排的领先平台。它提供了出色的功能,例如自动扩展、自动修复和负载平衡,这些功能使其成为软件工程师的首选解决方案。Kubernetes 的管理可...

深入了解-不够用时-调试的救星-superdebug-当debug-Kubernetes (深入了解不够)

深入了解-不够用时-调试的救星-superdebug-当debug-Kubernetes (深入了解不够)

kubectlexec 命令的限制 kubectlexec 命令用于在正在运行的 Pod 中执行命令,但它在 Kubernetes 中有以下限制: 不能以 root 身份运行:容...

100个常用命令-Kubernetes-提升集群管理和故障排除效率 (100个常用的关联词)

100个常用命令-Kubernetes-提升集群管理和故障排除效率 (100个常用的关联词)

本指南提供了全面的命令清单,用于诊断 Kubernetes 集群以及在其中运行的应用程序。请在使用这些命令时务必将占位符(如 <namespace> 和...

Gateway-维护者透露未来规划-API-Kubernetes-上线-1.0 (gateway翻译成中文)

Gateway-维护者透露未来规划-API-Kubernetes-上线-1.0 (gateway翻译成中文)

经过四年的努力,Kubernetes Gateway API 现已达到生产就绪状态。它提供了标准化的方法来管理进出 Kubernetes 集群的网络流量。 新特性...