小结

概述

源码版本:kubernetes master 分支 commit-fe62fc(2021年10月14日)

Informer 这个词的出镜率很高,我们在很多文章里都可以看到 Informer 的身影,但是我们在源码里真的去找一个叫做 Informer 的对象,却又发现找不到一个单纯的 Informer,但是有很多结构体或者接口里包含了 Informer 这个词……

ReflectorWorkqueue 等组件不同,Informer 相对来说更加模糊,让人初读源码时感觉迷惑。今天我们一起来揭开 Informer 等面纱,看下到底什么是 Informer

《Kubernetes client-go 源码分析 - 开篇》中我们提到过 InformerDeltaFIFO 中 pop 相应对象,然后通过 Indexer 将对象和索引丢到本地 cache 中,再触发相应的事件处理函数(Resource Event Handlers)运行。Informer 在整个自定义控制器工作流程中的位置如下图所示,今天我们具体分析下 Informer 的源码实现。

Kubernetes client-go Informer 源码分析

Controller

Informer 通过一个 controller 对象来定义,本身很简单,长这样:

  • client-go/tools/cache/controller.go:89
1type controller struct {
2   config         Config
3   reflector      *Reflector
4   reflectorMutex sync.RWMutex
5   clock          clock.Clock
6}

这里有我们熟悉的 Reflector,可以猜到 Informer 启动的时候会去运行 Reflector,从而通过 Reflector 实现 list-watch apiserver,更新“事件”到 DeltaFIFO 中用于进一步处理。Config 对象等会再看,我们继续看下 controller 对应的接口:

  • client-go/tools/cache/controller.go:98
1type Controller interface {
2   Run(stopCh <-chan struct{})
3   HasSynced() bool
4   LastSyncResourceVersion() string
5}

这里的核心明显是 Run(stopCh <-chan struct{}) 方法,Run 负责两件事情:

  1. 构造 Reflector 利用 ListerWatcher 的能力将对象事件更新到 DeltaFIFO;
  2. 从 DeltaFIFO 中 Pop 对象然后调用 ProcessFunc 来处理;

Controller 的初始化

Controller 的 New 方法很简单:

  • client-go/tools/cache/controller.go:116
1func New(c *Config) Controller {
2   ctlr := &controller{
3      config: *c,
4      clock:  &clock.RealClock{},
5   }
6   return ctlr
7}

这里没有太多的逻辑,主要是传递了一个 Config 进来,可以猜到核心逻辑是 Config 从何而来以及后面如何使用。我们先向上跟一下 Config 从哪里来,New() 的调用有几个地方,我们不去看 newInformer() 分支的代码,因为实际开发中主要是使用 SharedIndexInformer,两个入口初始化 Controller 的逻辑类似,我们直接跟更实用的一个分支,看 func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) 方法中如何调用的 New()

  • client-go/tools/cache/shared_informer.go:368
 1func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
2    // ……
3    fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
4        KnownObjects:          s.indexer,
5        EmitDeltaTypeReplaced: true,
6    })
7
8    cfg := &Config{
9        Queue:            fifo,
10        ListerWatcher:    s.listerWatcher,
11        ObjectType:       s.objectType,
12        FullResyncPeriod: s.resyncCheckPeriod,
13        RetryOnError:     false,
14        ShouldResync:     s.processor.shouldResync,
15
16        Process:           s.HandleDeltas,
17        WatchErrorHandler: s.watchErrorHandler,
18    }
19
20    func() {
21        s.startedLock.Lock()
22        defer s.startedLock.Unlock()
23
24        s.controller = New(cfg)
25        s.controller.(*controller).clock = s.clock
26        s.started = true
27    }()
28  // ……
29    s.controller.Run(stopCh)
30}

上面只保留了主要代码,我们后面会分析 SharedIndexInformer,所以这里先不纠结 SharedIndexInformer 的细节,我们从这里可以看到 SharedIndexInformer 的 Run() 过程里会构造一个 Config,然后创建 Controller,最后调用 Controller 的 Run() 方法。另外这里也可以看到我们前面系列文章里分析过的 DeltaFIFO、ListerWatcher 等,这里还有一个比较重要的是 Process:s.HandleDeltas, 这一行,Process 属性的类型是 ProcessFunc,这里可以看到具体的 ProcessFunc 是 HandleDeltas 方法。

Controller 的启动

上面提到 Controller 的初始化本身没有太多的逻辑,主要是构造了一个 Config 对象传递进来,所以 Controller 启动的时候肯定会有这个 Config 的使用逻辑,我们具体来看:

  • client-go/tools/cache/controller.go:127
 1func (c *controller) Run(stopCh <-chan struct{}) {
2   defer utilruntime.HandleCrash()
3   go func() {
4      <-stopCh
5      c.config.Queue.Close()
6   }()
7   // 利用 Config 里的配置构造 Reflector
8   r := NewReflector(
9      c.config.ListerWatcher,
10      c.config.ObjectType,
11      c.config.Queue,
12      c.config.FullResyncPeriod,
13   )
14   r.ShouldResync = c.config.ShouldResync
15   r.WatchListPageSize = c.config.WatchListPageSize
16   r.clock = c.clock
17   if c.config.WatchErrorHandler != nil {
18      r.watchErrorHandler = c.config.WatchErrorHandler
19   }
20
21   c.reflectorMutex.Lock()
22   c.reflector = r
23   c.reflectorMutex.Unlock()
24
25   var wg wait.Group
26   // 启动 Reflector
27   wg.StartWithChannel(stopCh, r.Run)
28   // 执行 Controller 的 processLoop
29   wait.Until(c.processLoop, time.Second, stopCh)
30   wg.Wait()
31}

这里的逻辑很简单,构造 Reflector 后运行起来,然后执行 c.processLoop,所以很明显,Controller 的业务逻辑肯定隐藏在 processLoop 方法里,我们继续来看。

processLoop

  • client-go/tools/cache/controller.go:181
 1func (c *controller) processLoop() {
2   for {
3      obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
4      if err != nil {
5         if err == ErrFIFOClosed {
6            return
7         }
8         if c.config.RetryOnError {
9            c.config.Queue.AddIfNotPresent(obj)
10         }
11      }
12   }
13}

这里的逻辑是从 DeltaFIFO 中 Pop 出一个对象丢给 PopProcessFunc 处理,如果失败了就 re-enqueue 到 DeltaFIFO 中。我们前面提到过这里的 PopProcessFunc 实现是 HandleDeltas() 方法,所以这里的主要逻辑就转到了 HandleDeltas() 是如何实现的了。

HandleDeltas()

这里我们先回顾下 DeltaFIFO 的存储结构,看下这个图:

Kubernetes client-go Informer 源码分析

然后再看源码,这里的逻辑主要是遍历一个 Deltas 里的所有 Delta,然后根据 Delta 的类型来决定如何操作 Indexer,也就是更新本地 cache,同时分发相应的通知。

  • client-go/tools/cache/shared_informer.go:537
 1func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
2   s.blockDeltas.Lock()
3   defer s.blockDeltas.Unlock()
4   // 对于每个 Deltas 来说,里面存了很多的 Delta,也就是对应不同 Type 的多个 Object,这里的遍历会从旧往新走
5   for _, d := range obj.(Deltas) {
6      switch d.Type {
7      // 除了 Deleted 外所有情况
8      case Sync, Replaced, Added, Updated:
9         // 记录变更,没有太多实际作用
10         s.cacheMutationDetector.AddObject(d.Object)
11         // 通过 indexer 从 cache 里查询当前 Object,如果存在
12         if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
13            // 更新 indexer 里的对象
14            if err := s.indexer.Update(d.Object); err != nil {
15               return err
16            }
17
18            isSync := false
19            switch {
20            case d.Type == Sync:
21               isSync = true
22            case d.Type == Replaced:
23               if accessor, err := meta.Accessor(d.Object); err == nil {
24                  if oldAccessor, err := meta.Accessor(old); err == nil {
25                     isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
26                  }
27               }
28            }
29            // 分发一个更新通知
30            s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
31           // 如果本地 cache 里没有这个 Object,则添加
32         } else {
33            if err := s.indexer.Add(d.Object); err != nil {
34               return err
35            }
36            // 分发一个新增通知
37            s.processor.distribute(addNotification{newObj: d.Object}, false)
38         }
39        // 如果是删除操作,则从 indexer 里删除这个 Object,然后分发一个删除通知
40      case Deleted:
41         if err := s.indexer.Delete(d.Object); err != nil {
42            return err
43         }
44         s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
45      }
46   }
47   return nil
48}

这里涉及到一个知识点:s.processor.distribute(addNotification{newObj: d.Object}, false) 中 processor 是什么?如何分发通知的?谁来接收通知?

我们回到 ProcessFunc 的实现上,除了 sharedIndexInformerHandleDeltas() 方法外,还有一个基础版本:

  • client-go/tools/cache/controller.go:446
 1        Process: func(obj interface{}) error {
2            for _, d := range obj.(Deltas) {
3                obj := d.Object
4                if transformer != nil {
5                    var err error
6                    obj, err = transformer(obj)
7                    if err != nil {
8                        return err
9                    }
10                }
11
12                switch d.Type {
13                case Sync, Replaced, Added, Updated:
14                    if old, exists, err := clientState.Get(obj); err == nil && exists {
15                        if err := clientState.Update(obj); err != nil {
16                            return err
17                        }
18                        h.OnUpdate(old, obj)
19                    } else {
20                        if err := clientState.Add(obj); err != nil {
21                            return err
22                        }
23                        h.OnAdd(obj)
24                    }
25                case Deleted:
26                    if err := clientState.Delete(obj); err != nil {
27                        return err
28                    }
29                    h.OnDelete(obj)
30                }
31            }
32            return nil
33        },

这里可以看到逻辑简单很多,除了更新 cache 外,调用了 h.OnAdd(obj)/h.OnUpdate(old, obj)/h.OnDelete(obj) 等方法,这里的 h 是 ResourceEventHandler 类型的,也就是 Process 过程直接调用了 ResourceEventHandler 的相应方法,这样就已经逻辑闭环了,ResourceEventHandler 的这几个方法里做一些简单的过滤后,会将这些对象的 key 丢到 workqueue,进而就触发了自定义调谐函数的运行。

换言之,sharedIndexInformer 中实现的 ProcessFunc 是一个进阶版本,不满足于简单调用 ResourceEventHandler 对应方法来完成 Process 逻辑,所以到这里基础的 Informer 逻辑已经闭环了,我们后面继续来看 sharedIndexInformer 中又对 Informer 做了哪些“增强”

SharedIndexInformer

我们在 Operator 开发中,如果不使用 controller-runtime 库,也就是不通过 Kubebuilder 等工具来生成脚手架时,经常会用到 SharedInformerFactory,比如典型的 sample-controller 中的 main() 函数:

  • sample-controller/main.go:40
 1func main() {
2    klog.InitFlags(nil)
3    flag.Parse()
4
5    stopCh := signals.SetupSignalHandler()
6
7    cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
8    if err != nil {
9        klog.Fatalf("Error building kubeconfig: %s", err.Error())
10    }
11
12    kubeClient, err := kubernetes.NewForConfig(cfg)
13    if err != nil {
14        klog.Fatalf("Error building kubernetes clientset: %s", err.Error())
15    }
16
17    exampleClient, err := clientset.NewForConfig(cfg)
18    if err != nil {
19        klog.Fatalf("Error building example clientset: %s", err.Error())
20    }
21
22    kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
23    exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)
24
25    controller := NewController(kubeClient, exampleClient,
26        kubeInformerFactory.Apps().V1().Deployments(),
27        exampleInformerFactory.Samplecontroller().V1alpha1().Foos())
28
29    kubeInformerFactory.Start(stopCh)
30    exampleInformerFactory.Start(stopCh)
31
32    if err = controller.Run(2, stopCh); err != nil {
33        klog.Fatalf("Error running controller: %s", err.Error())
34    }
35}

这里可以看到我们依赖于 kubeInformerFactory.Apps().V1().Deployments() 提供一个 Informer,这里的 Deployments() 方法返回的是一个 DeploymentInformer 类型,DeploymentInformer 是什么呢?如下

  • client-go/informers/apps/v1/deployment.go:37
1type DeploymentInformer interface {
2    Informer() cache.SharedIndexInformer
3    Lister() v1.DeploymentLister
4}

可以看到所谓的 DeploymentInformer 由 “Informer” 和 “Lister” 组成,也就是说我们编码时用到的 Informer 本质就是一个 SharedIndexInformer

  • client-go/tools/cache/shared_informer.go:186
1type SharedIndexInformer interface {
2   SharedInformer
3   AddIndexers(indexers Indexers) error
4   GetIndexer() Indexer
5}

这里的 Indexer 就很熟悉了,SharedInformer 又是啥呢?

  • client-go/tools/cache/shared_informer.go:133
 1type SharedInformer interface {
2   // 可以添加自定义的 ResourceEventHandler
3   AddEventHandler(handler ResourceEventHandler)
4   // 附带 resync 间隔配置,设置为 0 表示不关心 resync
5   AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
6   // 这里的 Store 指的是 Indexer
7   GetStore() Store
8   // 过时了,没有用
9   GetController() Controller
10   // 通过 Run 来启动
11   Run(stopCh <-chan struct{})
12   // 这里和 resync 逻辑没有关系,表示 Indexer 至少更新过一次全量的对象
13   HasSynced() bool
14   // 最后一次拿到的 RV
15   LastSyncResourceVersion() string
16   // 用于每次 ListAndWatch 连接断开时回调,主要就是日志记录的作用
17   SetWatchErrorHandler(handler WatchErrorHandler) error
18}

sharedIndexerInformer

接下来就该看下 SharedIndexInformer 接口的实现了,sharedIndexerInformer 定义如下:

  • client-go/tools/cache/shared_informer.go:287
 1type sharedIndexInformer struct {
2   indexer    Indexer
3   controller Controller
4   processor             *sharedProcessor
5   cacheMutationDetector MutationDetector
6   listerWatcher ListerWatcher
7   // 表示当前 Informer 期望关注的类型,主要是 GVK 信息
8   objectType runtime.Object
9   // reflector 的 resync 计时器计时间隔,通知所有的 listener 执行 resync
10   resyncCheckPeriod time.Duration
11   defaultEventHandlerResyncPeriod time.Duration
12   clock clock.Clock
13   started, stopped bool
14   startedLock      sync.Mutex
15   blockDeltas sync.Mutex
16   watchErrorHandler WatchErrorHandler
17}

这里的 Indexer、Controller、ListerWatcher 等都是我们熟悉的组件,sharedProcessor 我们在前面遇到了,需要重点关注一下。

sharedProcessor

sharedProcessor 中维护了 processorListener 集合,然后分发通知对象到这些 listeners,先看下结构定义:

  • client-go/tools/cache/shared_informer.go:588
1type sharedProcessor struct {
2   listenersStarted bool
3   listenersLock    sync.RWMutex
4   listeners        []*processorListener
5   syncingListeners []*processorListener
6   clock            clock.Clock
7   wg               wait.Group
8}

马上就会有一个疑问了,processorListener 是什么?

processorListener

  • client-go/tools/cache/shared_informer.go:690
 1type processorListener struct {
2   nextCh chan interface{}
3   addCh  chan interface{}
4   // 核心属性
5   handler ResourceEventHandler
6   pendingNotifications buffer.RingGrowing
7   requestedResyncPeriod time.Duration
8   resyncPeriod time.Duration
9   nextResync time.Time
10   resyncLock sync.Mutex
11}

可以看到 processorListener 里有一个 ResourceEventHandler,这是我们认识的组件。processorListener 有三个主要方法:

  • add(notification interface{})
  • pop()
  • run()

一个个来看吧。

run()

  • client-go/tools/cache/shared_informer.go:775
 1func (p *processorListener) run() {
2   stopCh := make(chan struct{})
3   wait.Until(func() {
4      for next := range p.nextCh {
5         switch notification := next.(type) {
6         case updateNotification:
7            p.handler.OnUpdate(notification.oldObj, notification.newObj)
8         case addNotification:
9            p.handler.OnAdd(notification.newObj)
10         case deleteNotification:
11            p.handler.OnDelete(notification.oldObj)
12         default:
13            utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
14         }
15      }
16      close(stopCh)
17   }, 1*time.Second, stopCh)
18}

这里的逻辑很清晰,从 nextCh 里拿通知,然后根据其类型去调用 ResourceEventHandler 相应的 OnAdd/OnUpdate/OnDelete 方法。

add() 和 pop()

  • client-go/tools/cache/shared_informer.go:741
 1func (p *processorListener) add(notification interface{}) {
2   // 将通知放到 addCh 中,所以下面 pop() 方法里先执行到的 case 是第二个
3   p.addCh <- notification
4}
5
6func (p *processorListener) pop() {
7   defer utilruntime.HandleCrash()
8   defer close(p.nextCh) // Tell .run() to stop
9
10   var nextCh chan<- interface{}
11   var notification interface{}
12   for {
13      select {
14        // 下面获取到的通知,添加到 nextCh 里,供 run() 方法中消费
15      case nextCh <- notification:
16         var ok bool
17         // 从 pendingNotifications 里消费通知,生产者在下面 case 里
18         notification, ok = p.pendingNotifications.ReadOne()
19         if !ok {
20            nextCh = nil
21         }
22        // 逻辑从这里开始,从 addCh 里提取通知
23      case notificationToAdd, ok := <-p.addCh:
24         if !ok {
25            return
26         }
27         if notification == nil { 
28            notification = notificationToAdd
29            nextCh = p.nextCh
30         } else {
31            // 新添加的通知丢到 pendingNotifications
32            p.pendingNotifications.WriteOne(notificationToAdd)
33         }
34      }
35   }
36}

也就是说 processorListener 提供了一定的缓冲机制来接收 notification,然后去消费这些 notification 调用 ResourceEventHandler 相关方法。

然后接着继续看 sharedProcessor 的几个主要方法。

sharedProcessor.addListener()

addListener 会直接调用 listenerrun()pop() 方法,这两个方法的逻辑我们上面已经分析过

  • client-go/tools/cache/shared_informer.go:597
 1func (p *sharedProcessor) addListener(listener *processorListener) {
2   p.listenersLock.Lock()
3   defer p.listenersLock.Unlock()
4
5   p.addListenerLocked(listener)
6   if p.listenersStarted {
7      p.wg.Start(listener.run)
8      p.wg.Start(listener.pop)
9   }
10}

sharedProcessor.distribute()

distribute 的逻辑就是调用 sharedProcessor 内部维护的所有 listner 的 add() 方法

  • client-go/tools/cache/shared_informer.go:613
 1func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
2   p.listenersLock.RLock()
3   defer p.listenersLock.RUnlock()
4
5   if sync {
6      for _, listener := range p.syncingListeners {
7         listener.add(obj)
8      }
9   } else {
10      for _, listener := range p.listeners {
11         listener.add(obj)
12      }
13   }
14}

sharedProcessor.run()

run() 的逻辑和前面的 addListener() 类似,也就是调用 listenerrun()pop() 方法

  • client-go/tools/cache/shared_informer.go:628
 1func (p *sharedProcessor) run(stopCh <-chan struct{}) {
2    func() {
3        p.listenersLock.RLock()
4        defer p.listenersLock.RUnlock()
5        for _, listener := range p.listeners {
6            p.wg.Start(listener.run)
7            p.wg.Start(listener.pop)
8        }
9        p.listenersStarted = true
10    }()
11    <-stopCh
12    p.listenersLock.RLock()
13    defer p.listenersLock.RUnlock()
14    for _, listener := range p.listeners {
15        close(listener.addCh)
16    }
17    p.wg.Wait()
18}

到这里基本就知道 sharedProcessor 的能力了,继续往下看。

sharedIndexInformer.Run()

继续来看 sharedIndexInformerRun() 方法,这里面已经几乎没有陌生的内容了。

  • client-go/tools/cache/shared_informer.go:368
 1func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
2   defer utilruntime.HandleCrash()
3
4   if s.HasStarted() {
5      klog.Warningf("The sharedIndexInformer has started, run more than once is not allowed")
6      return
7   }
8   // DeltaFIFO 就很熟悉了
9   fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
10      KnownObjects:          s.indexer,
11      EmitDeltaTypeReplaced: true,
12   })
13   // Config 的逻辑也在上面遇到过了
14   cfg := &Config{
15      Queue:            fifo,
16      ListerWatcher:    s.listerWatcher,
17      ObjectType:       s.objectType,
18      FullResyncPeriod: s.resyncCheckPeriod,
19      RetryOnError:     false,
20      ShouldResync:     s.processor.shouldResync,
21
22      Process:           s.HandleDeltas,
23      WatchErrorHandler: s.watchErrorHandler,
24   }
25
26   func() {
27      s.startedLock.Lock()
28      defer s.startedLock.Unlock()
29      // 前文分析过这里的 New() 函数逻辑了
30      s.controller = New(cfg)
31      s.controller.(*controller).clock = s.clock
32      s.started = true
33   }()
34
35   processorStopCh := make(chan struct{})
36   var wg wait.Group
37   defer wg.Wait()              
38   defer close(processorStopCh) 
39   wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
40   // processor 的 run 方法
41   wg.StartWithChannel(processorStopCh, s.processor.run)
42
43   defer func() {
44      s.startedLock.Lock()
45      defer s.startedLock.Unlock()
46      s.stopped = true // Don't want any new listeners
47   }()
48   // controller 的 Run()
49   s.controller.Run(stopCh)
50}

到这里也就基本知道了 sharedIndexInformer 的逻辑了,再往上层走就剩下一个 SharedInformerFactory 了,继续看吧~

SharedInformerFactory

我们前面提到过 SharedInformerFactory,现在具体来看一下 SharedInformerFactory 是怎么实现的。先看接口定义:

  • client-go/informers/factory.go:187
 1type SharedInformerFactory interface {
2   internalinterfaces.SharedInformerFactory
3   ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
4   WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
5
6   Admissionregistration() admissionregistration.Interface
7   Internal() apiserverinternal.Interface
8   Apps() apps.Interface
9   Autoscaling() autoscaling.Interface
10   Batch() batch.Interface
11   Certificates() certificates.Interface
12   Coordination() coordination.Interface
13   Core() core.Interface
14   Discovery() discovery.Interface
15   Events() events.Interface
16   Extensions() extensions.Interface
17   Flowcontrol() flowcontrol.Interface
18   Networking() networking.Interface
19   Node() node.Interface
20   Policy() policy.Interface
21   Rbac() rbac.Interface
22   Scheduling() scheduling.Interface
23   Storage() storage.Interface
24}

这里涉及到几个点:

  1. internalinterfaces.SharedInformerFactory

这也是一个接口,比较简短:

1type SharedInformerFactory interface {
2   Start(stopCh <-chan struct{})
3   InformerFor(obj runtime.Object, newFunc NewInformerFunc) cache.SharedIndexInformer
4}

可以看到熟悉的 SharedIndexInformer

  1. ForResource(resource schema.GroupVersionResource) (GenericInformer, error)

这里接收一个 GVR,返回了一个 GenericInformer,看下什么是 GenericInformer:

1type GenericInformer interface {
2   Informer() cache.SharedIndexInformer
3   Lister() cache.GenericLister
4}

也很简短。

  1. Apps() apps.Interface 等

后面一堆方法是类似的,我们以 Apps() 为例来看下怎么回事。这里的 Interface 定义如下:

  • client-go/informers/apps/interface.go:29
1type Interface interface {
2   // V1 provides access to shared informers for resources in V1.
3   V1() v1.Interface
4   // V1beta1 provides access to shared informers for resources in V1beta1.
5   V1beta1() v1beta1.Interface
6   // V1beta2 provides access to shared informers for resources in V1beta2.
7   V1beta2() v1beta2.Interface
8}

显然应该继续看下 v1.Interface 是个啥。

  • client-go/informers/apps/v1/interface.go:26
 1type Interface interface {
2   // ControllerRevisions returns a ControllerRevisionInformer.
3   ControllerRevisions() ControllerRevisionInformer
4   // DaemonSets returns a DaemonSetInformer.
5   DaemonSets() DaemonSetInformer
6   // Deployments returns a DeploymentInformer.
7   Deployments() DeploymentInformer
8   // ReplicaSets returns a ReplicaSetInformer.
9   ReplicaSets() ReplicaSetInformer
10   // StatefulSets returns a StatefulSetInformer.
11   StatefulSets() StatefulSetInformer
12}

到这里已经有看着很眼熟的 Deployments() DeploymentInformer 之类的代码了,DeploymentInformer 我们刚才看过内部结构,长这样:

1type DeploymentInformer interface {
2   Informer() cache.SharedIndexInformer
3   Lister() v1.DeploymentLister
4}

到这里也就不难理解 SharedInformerFactory 的作用了,它提供了所有 API group-version 的资源对应的 SharedIndexInformer,也就不难理解开头我们引用的 sample-controller 中的这行代码:

1kubeInformerFactory.Apps().V1().Deployments()

通过其可以拿到一个 Deployment 资源对应的 SharedIndexInformer。

NewSharedInformerFactory

继续看下 SharedInformerFactory 是如何创建的

  • client-go/informers/factory.go:96
1func NewSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration) SharedInformerFactory {
2   return NewSharedInformerFactoryWithOptions(client, defaultResync)
3}

可以看到参数非常简单,主要是需要一个 Clientset,毕竟 ListerWatcher 的能力本质还是 client 提供的。

  • client-go/informers/factory.go:109
 1func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
2   factory := &sharedInformerFactory{
3      client:           client,
4      namespace:        v1.NamespaceAll, // 空字符串 ""
5      defaultResync:    defaultResync,
6      informers:        make(map[reflect.Type]cache.SharedIndexInformer), // 可以存放不同类型的 SharedIndexInformer
7      startedInformers: make(map[reflect.Type]bool),
8      customResync:     make(map[reflect.Type]time.Duration),
9   }
10
11   for _, opt := range options {
12      factory = opt(factory)
13   }
14
15   return factory
16}

接着是如何启动

sharedInformerFactory.Start()

  • client-go/informers/factory.go:128
 1func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
2   f.lock.Lock()
3   defer f.lock.Unlock()
4
5   for informerType, informer := range f.informers {
6      // 同类型只会调用一次,Run() 的逻辑我们前面介绍过了
7      if !f.startedInformers[informerType] {
8         go informer.Run(stopCh)
9         f.startedInformers[informerType] = true
10      }
11   }
12}

小结

今天我们一个基础 Informer - Controller 开始介绍,先分析了 Controller 的能力,也就是其通过构造 Reflector 并启动从而能够获取指定类型资源的“更新”事件,然后通过事件构造 Delta 放到 DeltaFIFO 中,进而在 processLoop 中从 DeltaFIFO 里 pop Deltas 来处理,一方面将对象通过 Indexer 同步到本地 cache,也就是一个 ThreadSafeStore,一方面调用 ProcessFunc 来处理这些 Delta。

然后 SharedIndexInformer 提供了构造 Controller 的能力,通过 HandleDeltas() 方法实现上面提到的 ProcessFunc,同时还引入了 sharedProcessor 在 HandleDeltas() 中用于事件通知的处理。sharedProcessor 分发事件通知的时候,接收方是内部继续抽象出来的 processorListener,在 processorListener 中完成了 ResourceEventHandler 具体回调函数的调用。

最后 SharedInformerFactory 又进一步封装了提供所有 api 资源对应的 SharedIndexInformer 的能力。也就是说一个 SharedIndexInformer 可以处理一种类型的资源,比如 Deployment 或者 Pod 等,而通过 SharedInformerFactory 可以轻松构造任意已知类型的 SharedIndexInformer。另外这里用到了 Clientset 提供的访问所有 api 资源的能力,通过其也就能够完整实现整套 Informer 逻辑了。

此前我们已经陆续分析了:

各种“组件”分工明确,最终汇聚在 “Informer” 里,实现了一套复杂而优雅的资源处理能力,至此自定义控制器涉及逻辑中 client-go 部分就基本分析完了!

【完整目录参见>>> 《深入理解 K8S 原理与实现》系列目录 <<<

(转载请保留本文原始链接 https://www.danielhu.cn)

Kubernetes client-go Informer 源码分析

 

 

分类:

技术点:

相关文章: