鉴于Informer架构及其处理逻辑蕴含了丰富的实战技术,本文将分为上下两章进行深入探讨。
上篇将专注于解析Informer中的Reflector组件,而下篇则会详尽分析Indexer模块。通过这种结构化的呈现方式,提供一个全面且系统的学习路径。
问题发现
在前一章节中,我们探讨了使用RestClient访问Kubernetes(K8S)集群资源的方法,并详细解析了其源码实现流程。值得注意的是,尽管RestClient构成了client-go与K8S交互的基础,但在实际应用中,更常见的是采用以下几种高级客户端来简化操作:
- ClientSet:基于RestClient构建,提供了对所有内置K8S资源类型的封装接口。通过ClientSet,开发者能够以更为便捷的方式直接调用这些资源而无需记忆具体的API路径、资源名称或版本信息。然而,这种便利性是以牺牲灵活性为代价的——ClientSet不支持自定义资源定义(CRD)对象的操作。
- DynamicClient:作为一款动态客户端工具,它允许用户针对任何Kubernetes资源执行查询操作,其中包括自定义资源定义(CRDs)。这一特性使得DynamicClient成为处理非标准或新引入资源的理想选择。
- DiscoveryClient:主要用于发现并获取由Kubernetes API服务器提供的资源列表信息。此外,在kubectl命令行工具中,DiscoveryClient还会将这些信息缓存至本地目录~/.kube/cache/discovery/下,从而加快后续请求的速度。
以上所述的各种客户端组件各自具备独特优势,根据具体需求灵活选用可显著提升开发效率及系统管理能力。
在这一过程中,clientSet的informer无疑扮演了极其重要的角色。那么,为什么需要引入informer?它又是如何被开发出来的呢?记住这一点至关重要:任何技术的诞生都是为了解决特定问题。为了回答这两个问题,反证法来看下面这一个问题:
如果所有组件及外部程序都直接通过clientSet或RestClient来访问Kubernetes集群资源,将会遇到哪些挑战?
- 并发性问题:当基础设施的存储资源被视为业务逻辑的一部分时,所有请求无限制地涌向API服务器会导致其负载显著增加(尽管API服务器具备一定的限流机制)。然而,一旦触发了限流阈值,将可能导致服务不可用,从而对业务造成负面影响。
- 缓存缺失问题:从restClient的实现来看,它缺乏有效的缓存策略,这意味着每次数据查询都需要经过API服务器,这不仅增加了网络延迟,也加重了API服务器的工作负担。
- 数据时效性问题:对于API服务器上发生的任何更新,restClient是无法即时感知到的,因此必须定期轮询API服务器以获取最新信息。随着系统规模的增长,这种做法会进一步加剧API服务器的并发压力。虽然可以使用clientSet提供的watch功能来监听变化,但这又引出了新的挑战。
- 版本一致性问题:采用clientSet进行监听操作时,若因网络不稳定等因素导致监听中断,则需手动执行全量同步(Resync)以确保本地缓存与远程状态的一致性。
- 错误处理机制不足:无论是restClient还是clientSet,它们都将错误恢复的责任交给了应用层,这意味着开发者需要自行设计和实现相应的容错逻辑。
- 在业务处理过程中,Kubernetes(k8s)依赖于事件驱动机制来管理资源状态的变化。因此,应用程序需要自行解析不同类型的事件(如添加、更新或删除),并据此维护相应的状态机,这无疑增加了代码实现的复杂度。
- 安全方面:访问Kubernetes集群主要依靠证书和Token进行身份验证与授权,但这一过程容易遇到配置不当或证书/Token过期的问题。
综上所述,informer作为clientSet的一部分,正是为了解决上述提到的各种问题而被引入的,它提供了一种更加高效且可靠的机制来管理Kubernetes对象的状态。
针对上述挑战,Informer提供了一种有效的解决方案:
- List/Watch机制:通过高效地监听Kubernetes集群内部发生的各类事件,Informer能够及时捕获资源状态变化。
- 事件处理简化:Informer引入了EventHandler接口,允许开发者将自定义的回调函数封装起来以响应特定事件,从而让开发者可以更加专注于业务逻辑的实现而非底层事件处理细节。
- 本地缓存支持:对于所有接收到的信息,Informer会在完成事件处理后将其存储至本地缓存中,并且提供了灵活多样的查询接口供应用层调用。
- 共享连接优化:利用共享的Informer实例,多个组件可以共同使用同一个Watch连接来监听相同的资源类型,比如当有十个不同的服务都需要监听Pod事件时,使用SharedInformer只需要建立一个Watch连接即可满足需求,相比之下直接使用ClientSet则需为每个服务单独创建一个独立的Watch连接。
- 定期同步:Informer还支持周期性地重新同步数据,确保本地缓存与实际集群状态保持一致。
综上所述,这些特性共同构成了Kubernetes生态系统中极为重要的Informer功能,极大地提升了开发效率及系统的稳定性。
接下来,我们将利用goanalysis结合测试代码对informer中的controller和Reflector进行深入分析。goanalysis能够在执行结束前提供所有已执行函数的堆栈信息及其参数快照,这有助于克服使用dlv调试时频繁需要重启程序的问题。
本示例基于client-go版本v0.30.1。
请注意,这里假设您已经熟悉Go语言环境以及Kubernetes相关组件的工作原理。
通过这种方式,我们旨在更高效地理解及调试informer机制内部复杂的交互过程。
初步体验
- import (<br> "log"<br> "time"<br><br> "github.com/toheart/functrace"<br> v1 "k8s.io/apimachinery/pkg/apis/meta/v1"<br> "k8s.io/client-go/informers"<br> "k8s.io/client-go/kubernetes"<br> "k8s.io/client-go/tools/cache"<br> "k8s.io/client-go/tools/clientcmd"<br> "k8s.io/klog/v2"<br>)<br><br>func main() {<br> defer func() {<br> functrace.CloseTraceInstance() # 透视代码函数结束, 不用在意<br> }()<br> config, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config")<br> if err != nil {<br> panic(err)<br> }<br> config.Timeout = time.Second * 5<br> clientset, err := kubernetes.NewForConfig(config)<br> if err != nil {<br> panic(err)<br> }<br> stopCh := make(chan struct{})<br> defer close(stopCh)<br> log.Printf("create new inforemer \n")<br> sharedInformers := informers.NewSharedInformerFactory(clientset, time.Minute)<br><br> informer := sharedInformers.Core().V1().Pods().Informer()<br><br> informer.AddEventHandler(cache.ResourceEventHandlerFuncs{<br> AddFunc: func(obj interface{}) {<br> mObj := obj.(v1.Object)<br> log.Printf("New Pod Added to Store: %s", mObj.GetName())<br> },<br> UpdateFunc: func(oldObj, newObj interface{}) {<br> oObj := oldObj.(v1.Object)<br> nObj := newObj.(v1.Object)<br><br> log.Printf("%s Pod Update to %s", oObj.GetName(), nObj.GetName())<br> },<br> DeleteFunc: func(obj interface{}) {<br> mObj := obj.(v1.Object)<br> log.Printf("Pod Deleted from Store:%s", mObj.GetName())<br> },<br> })<br> sharedInformers.Start(stopCh)<br><br> if !cache.WaitForCacheSync(stopCh, informer.HasSynced) {<br> klog.Fatal("failed to synced")<br> }<br> select {}<br>}<br><br>
复制代码 从上述描述中可以看出,Informer功能的实现流程相对直接,主要包含以下几个关键步骤:
- 获取认证凭据;
- 构建clientset实例;
- 利用clientset来初始化SharedInformerFactory对象;
- 为需要监听的具体资源创建对应的Informer;
- 定义针对不同资源事件的处理逻辑;
- 启动sharedInformers以开始监听和处理资源变动。
这样的结构化方法确保了Kubernetes集群内资源变更能够被高效地捕获与响应。
透视源码
初始化ClientSet
- clientset, err := kubernetes.NewForConfig(config)<br>if err != nil {<br> panic(err)<br>}<br>
复制代码 在前文所述中,我们提到clientSet会创建一系列内置资源对象,并将其存储于结构体之中。通过可视化界面,可以进一步观察到其中的细节。
抽出一个函数看:- cs.admissionregistrationV1, err = admissionregistrationv1.NewForConfigAndClient(&configShallowCopy, httpClient)<br>if err != nil {<br> return nil, err<br>}<br>
复制代码 其中,httpClient 是 RestClient 对象内部封装的 HTTP 客户端实例。
初始化ShardInformer
最终在执行过程中调用的是NewSharedInformerFactoryWithOptions函数。该函数的实现相对简洁:- func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {<br> factory := &sharedInformerFactory{<br> client: client, // clientset客户端<br> namespace: v1.NamespaceAll, // 使用""<br> defaultResync: defaultResync, // 默认重新同步时间<br> informers: make(map[reflect.Type]cache.SharedIndexInformer), // Informer<br> startedInformers: make(map[reflect.Type]bool), // informer是否已经启动<br> customResync: make(map[reflect.Type]time.Duration), // informer自定义同步时间<br> }<br><br> // Apply all options<br> for _, opt := range options {<br> factory = opt(factory)<br> }<br><br> return factory<br>}<br>
复制代码 根据上述内容,可以得出以下结论:
- 对于每个资源对象,shardInformer仅维护一个对应的Informer实例;
- 关于同步时间的设定,若在customResync中已定义,则优先采用该值;反之,在未指定的情况下,则使用默认的defaultResync配置。
创建podInformer
- informer := sharedInformers.Core().V1().Pods().Informer()<br>
复制代码 调用链如下:
核心代码如下:- func (f *podInformer) Informer() cache.SharedIndexInformer {<br> return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)<br>}<br><br>func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {<br> f.lock.Lock()<br> defer f.lock.Unlock()<br> // 如果 &corev1.Pod{} 存在, 则直接返回informer<br> informerType := reflect.TypeOf(obj)<br> informer, exists := f.informers[informerType]<br> if exists {<br> return informer<br> }<br> resyncPeriod, exists := f.customResync[informerType]<br> // 如果 resync 不存在, 则使用默认的Resync<br> if !exists {<br> resyncPeriod = f.defaultResync<br> }<br> // 创建新的informer, 将保存到map中。<br> informer = newFunc(f.client, resyncPeriod)<br> informer.SetTransform(f.transform) // 这里从goanalysis中可以看出其当前为Nil.<br> f.informers[informerType] = informer<br><br> return informer<br>}<br><br>func (s *sharedIndexInformer) SetTransform(handler TransformFunc) error {<br> s.startedLock.Lock()<br> defer s.startedLock.Unlock()<br> if s.started {<br> return fmt.Errorf("informer has already started")<br> <br> s.transform = handler<br> return nil<br>}<br>
复制代码 在上述过程中,用于创建Informer的函数是newFunc,该函数被赋值为f.defaultInformer。具体来说,Informer的创建流程如下:
- 定义或获取newFunc。
- 将f.defaultInformer赋值给newFunc。
- 通过调用newFunc来初始化并创建Informer实例。
- func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {<br> return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)<br>}<br><br>func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {<br> return cache.NewSharedIndexInformer(<br> &cache.ListWatch{<br> ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {<br> if tweakListOptions != nil {<br> tweakListOptions(&options)<br> }<br> return client.CoreV1().Pods(namespace).List(context.TODO(), options)<br> },<br> WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {<br> if tweakListOptions != nil {<br> tweakListOptions(&options)<br> }<br> return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)<br> },<br> },<br> &corev1.Pod{},<br> resyncPeriod,<br> indexers,<br> )<br>}<br><br>func NewSharedIndexInformerWithOptions(lw ListerWatcher, exampleObject runtime.Object, options SharedIndexInformerOptions) SharedIndexInformer {<br> realClock := &clock.RealClock{}<br><br> return &sharedIndexInformer{<br> indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers),<br> processor: &sharedProcessor{clock: realClock},<br> // NewFilteredPodInformer cache.ListWatch<br> listerWatcher: lw,<br> // v1.Pod 结构体<br> objectType: exampleObject, <br> objectDescription: options.ObjectDescription, //""<br> resyncCheckPeriod: options.ResyncPeriod, // 60000000000<br> defaultEventHandlerResyncPeriod: options.ResyncPeriod, // 60000000000<br> clock: realClock,<br> // "*v1.Pod"<br> cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)), <br> }<br>}<br>
复制代码 最终通过调用NewSharedIndexInformerWithOptions函数来创建共享索引通知器。该过程的核心要素包括:
- List/Watch机制:此机制底层基于特定函数的初始化,旨在同步对象状态并监听这些对象的变化,从而确保数据源的实时性和准确性。
- Indexer对象初始化:这一步骤建立了本地缓存系统,允许用户在查询时直接访问本地存储的信息而非向API服务器发起请求,从而提高响应速度和效率。
- SharedProcessor对象初始化:此组件负责将从Watch接口接收到的对象变更事件转发给相应的Informer进行进一步处理,保证了信息流的有效传递与处理。
以上三个组成部分共同构成了SharedIndexInformer的核心架构,为其高效运作提供了坚实的基础。
添加EventHandler
- func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error) {<br> s.startedLock.Lock()<br> defer s.startedLock.Unlock()<br> // 如果informer已经stop<br> if s.stopped {<br> return nil, fmt.Errorf("handler %v was not added to shared informer because it has stopped already", handler)<br> }<br> .......<br> // 创建一个 processorListener 实例,用于处理事件通知。<br> // 该实例将使用给定的 handler、resyncPeriod、<br> // determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod) 计算的 resyncPeriod、<br> // s.clock.Now() 获取当前时间、initialBufferSize 初始缓冲区大小,<br> // 以及 s.HasSynced 方法来确定是否已同步。<br> listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize, s.HasSynced)<br> // 如果 informer 尚未启动,则将 listener 添加到 processor 中,并返回处理程序句柄。<br> if !s.started {<br> return s.processor.addListener(listener), nil<br> }<br><br> // 为了安全地加入,我们需要<br> // 1. 停止发送添加/更新/删除通知<br> // 2. 对存储进行列表操作<br> // 3. 向新的处理程序发送合成的“添加”事件<br> // 4. 解锁<br> s.blockDeltas.Lock()<br> defer s.blockDeltas.Unlock()<br> <br> handle := s.processor.addListener(listener)<br> for _, item := range s.indexer.List() {<br> // 请注意,我们在持有锁的情况下排队这些通知<br> // 并在返回句柄之前。这意味着没有机会让任何人调用句柄的 HasSynced 方法<br> // 在一个会错误返回 true 的状态下(即,当<br> // 共享通知者已同步但尚未观察到一个带有 isInitialList 为 true 的 Add<br> // 事件,或者当处理通知的线程以某种方式比这个<br> // 添加它们的线程更快,计数器暂时为零)。<br> listener.add(addNotification{newObj: item, isInInitialList: true})<br> }<br> return handle, nil<br>}<br>
复制代码 由此可见,shardInformer具备了动态管理监听器(listener)的能力,支持以线程安全的方式添加新的处理器。
启动shardInformer
[code]func (f *sharedInformerFactory) Start(stopCh |