lianngkyle

k8s informer概述

我们都知道可以使用k8s的Clientset来获取所有的原生资源对象,那么怎么能持续的获取集群的所有资源对象,或监听集群的资源对象数据的变化呢?这里不需要轮询去不断执行List操作,而是调用Watch接口,即可监听资源对象的变化,当资源对象发生变化,客户端即可通过Watch接口收到资源对象的变化。

Watch接口虽然可以直接使用,但一般情况下很少直接使用,因为往往由于集群中的资源较多,我们需要自己在客户端去维护一套缓存,而这个维护成本比较大。

也是因为如此,client-go提供了自己的实现机制,Informers应运而生。informers实现了持续获取集群的所有资源对象、监听集群的资源对象变化功能,并在本地维护了全量资源对象的内存缓存,以减少对apiserver、对etcd的请求压力。Informers在启动的时候会首先在客户端调用List接口来获取全量的对象集合,然后通过Watch接口来获取增量的对象,然后更新本地缓存。

此外informers也有很强的健壮性,当长期运行的watch连接中断时,informers会尝试拉起一个新的watch请求来恢复连接,在不丢失任何事件的情况下恢复事件流。另外,informers还可以配置一个重新同步的周期参数,每间隔该周期,informers就会重新List全量数据。

在informers的使用上,通常每个GroupVersionResource(GVR)只实例化一个informers,但有时候我们在一个应用中往往会在多个地方对同一种资源对象都有informer的需求,所以就有了共享informer,即SharedInformerFactory。所以可以通过使用SharedInformerFactory来实例化informers,这样本地内存缓存就只有一份,通知机制也只有一套,大大提高了效率,减少了资源浪费。

k8s informer架构

k8s client-go informer主要包括以下部件:
(1)Reflector:Reflector从kube-apiserver中list&watch资源对象,然后调用DeltaFIFO的Add/Update/Delete/Replace方法将资源对象及其变化包装成Delta并将其丢到DeltaFIFO中;
(2)DeltaFIFO:DeltaFIFO中存储着一个map和一个queue,即map[object key]Deltas以及object key的queue,Deltas为Delta的切片类型,Delta装有对象及对象的变化类型(Added/Updated/Deleted/Sync) ,Reflector负责DeltaFIFO的输入,Controller负责处理DeltaFIFO的输出;
(3)Controller:Controller从DeltaFIFO的queue中pop一个object key出来,并获取其关联的 Deltas出来进行处理,遍历Deltas,根据对象的变化更新Indexer中的本地内存缓存,并通知Processor,相关对象有变化事件发生;
(4)Processor:Processor根据对象的变化事件类型,调用相应的ResourceEventHandler来处理对象的变化;
(5)Indexer:Indexer中有informer维护的指定资源对象的相对于etcd数据的一份本地内存缓存,可通过该缓存获取资源对象,以减少对apiserver、对etcd的请求压力;
(6)ResourceEventHandler:用户根据自身处理逻辑需要,注册自定义的的ResourceEventHandler,当对象发生变化时,将触发调用对应类型的ResourceEventHandler来做处理。

根据informer架构,对k8s informer的分析将分为以下几部分进行,本篇为概要分析:
(1)informer概要分析;
(2)informer之初始化与启动分析;
(3)informer之Reflector分析;
(4)informer之DeltaFIFO分析;
(5)informer之Controller&Processor分析;
(6)informer之Indexer分析;

informer使用示例代码

使用大致过程如下:
(1)构建与kube-apiserver通信的config配置;
(2)初始化与apiserver通信的clientset;
(3)利用clientset初始化shared informer factory以及pod informer;
(4)注册informer的自定义ResourceEventHandler;
(5)启动shared informer factory,开始informer的list & watch操作;
(6)等待informer从kube-apiserver同步资源完成,即informer的list操作获取的对象都存入到informer中的indexer本地缓存中;
(7)创建lister,可以从informer中的indexer本地缓存中获取对象;

func main() {
    // 自定义与kube-apiserver通信的config配置
    master := "192.168.1.10" // apiserver url
    kubeconfig := "/.kube/config"
    config, err = clientcmd.BuildConfigFromFlags(master, kubeconfig)
    if err != nil {
		klog.Fatalf("Failed to create config: %v", err)
	}
	// 或使用k8s serviceAccount机制与kube-apiserver通信
	// config, err = rest.InClusterConfig()
    
    // 初始化与apiserver通信的clientset
    clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		klog.Fatalf("Failed to create client: %v", err)
	}
	
	// 初始化shared informer factory以及pod informer
	factory := informers.NewSharedInformerFactory(clientset, 30*time.Second)
	podInformer := factory.Core().V1().Pods()
	informer := podInformer.Informer()
	
	// 注册informer的自定义ResourceEventHandler
	informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    xxx,
		UpdateFunc: xxx,
		DeleteFunc: xxx,
	})
	
	// 启动shared informer factory,开始informer的list & watch操作
	stopper := make(chan struct{})
	go factory.Start(stopper)
	
	// 等待informer从kube-apiserver同步资源完成,即informer的list操作获取的对象都存入到informer中的indexer本地缓存中 
	// 或者调用factory.WaitForCacheSync(stopper)
	if !cache.WaitForCacheSync(stopper, informer.HasSynced) {
		runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
		return
	}
	
	// 创建lister
	podLister := podInformer.Lister()
	// 从informer中的indexer本地缓存中获取对象
	podList, err := podLister.List(labels.Everything())
	if err != nil {
		fmt.Println(err)
	}
	
}

总结

以上只是对K8s informer做了简单的介绍,以及简单的写了一下如何使用informer的示例代码,后面将开始对informer的各个部件做进一步的源码分析,敬请期待。

最后以一张k8s informer的架构图作为结尾总结,大家回忆一下k8s informer的架构组成以及各个部件的作用。

相关文章: