• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Kubernetes client-go 源码分析 - Reflector

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

概述入口 - Reflector.Run()核心 - Reflector.ListAndWatch()Reflector.watchHandler()NewReflector()小结

概述

源码版本:kubernetes master 分支 commit-fe62fc(2021年10月14日)

回顾一下 Reflector 在整个自定义控制器工作流中的位置:

《Kubernetes client-go 源码分析 - 开篇》中我们提到过 Reflector 的任务就是向 apiserver watch 特定类型的资源,拿到变更通知后将其丢到 DeltaFIFO 队列中。另外前面已经在 《Kubernetes client-go 源码分析 - ListWatcher》中分析过 ListWatcher 是如何从 apiserver 中 list-watch 资源的,今天我们继续来看 Reflector 的实现。

入口 - Reflector.Run()

Reflector 的启动入口是 Run() 方法:

  • client-go/tools/cache/reflector.go:218
1func (r *Reflector) Run(stopCh <-chan struct{}) {
2   klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
3   wait.BackoffUntil(func() {
4      if err := r.ListAndWatch(stopCh); err != nil {
5         r.watchErrorHandler(r, err)
6      }
7   }, r.backoffManager, true, stopCh)
8   klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
9}

这里有一些健壮性机制,用于处理 apiserver 短暂失联的场景。我们直接来看主要逻辑先,也就是 Reflector.ListAndWatch() 方法的内容。

核心 - Reflector.ListAndWatch()

Reflector.ListAndWatch() 方法有将近 200 行,是 Reflector 的核心逻辑之一。ListAndWatch() 方法做的事情是先 list 特定资源的所有对象,然后获取其资源版本,接着使用这个资源版本来开始 watch 流程。watch 到新版本资源然后将其加入 DeltaFIFO 的动作是在 watchHandler() 方法中具体实现的,后面一节会单独分析。在此之前 list 到的最新 items 会通过 syncWith() 方法添加一个 Sync 类型的 DeltaType 到 DeltaFIFO 中,所以 list 操作本身也会触发后面的调谐逻辑运行。具体来看:

  • client-go/tools/cache/reflector.go:254
  1func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
2   klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
3   var resourceVersion string
4
5   // 当 r.lastSyncResourceVersion 为 "" 时这里为 "0",当使用 r.lastSyncResourceVersion 失败时这里为 ""
6   // 区别是 "" 会直接请求到 etcd,获取一个最新的版本,而 "0" 访问的是 cache
7   options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}
8
9   if err := func() error {
10      // trace 是用于记录操作耗时的,这里的逻辑是超过 10s 的步骤打印出来
11      initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
12      defer initTrace.LogIfLong(10 * time.Second)
13      var list runtime.Object
14      var paginatedResult bool
15      var err error
16      listCh := make(chan struct{}, 1)
17      panicCh := make(chan interface{}, 1)
18      go func() { // 内嵌一个函数,这里会直接调用
19         defer func() {
20            if r := recover(); r != nil { // 收集这个 goroutine panic 的时候将奔溃信息
21               panicCh <- r
22            }
23         }()
24         // 开始尝试收集 list 的 chunks,我们在 《Kubernetes List-Watch 机制原理与实现 - chunked》中介绍过相关逻辑
25         pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
26            return r.listerWatcher.List(opts)
27         }))
28         switch {
29         case r.WatchListPageSize != 0:
30            pager.PageSize = r.WatchListPageSize
31         case r.paginatedResult:
32         case options.ResourceVersion != "" && options.ResourceVersion != "0":
33            pager.PageSize = 0
34         }
35
36         list, paginatedResult, err = pager.List(context.Background(), options)
37         if isExpiredError(err) || isTooLargeResourceVersionError(err) {
38            // 设置这个属性后,下一次 list 会从 etcd 里取
39            r.setIsLastSyncResourceVersionUnavailable(true)
40            list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
41         }
42         close(listCh)
43      }()
44      select {
45      case <-stopCh:
46         return nil
47      case r := <-panicCh:
48         panic(r)
49      case <-listCh:
50      }
51      if err != nil {
52         return fmt.Errorf("failed to list %v: %v", r.expectedTypeName, err)
53      }
54
55      if options.ResourceVersion == "0" && paginatedResult {
56         r.paginatedResult = true
57      }
58
59      // list 成功
60      r.setIsLastSyncResourceVersionUnavailable(false)
61      initTrace.Step("Objects listed")
62      listMetaInterface, err := meta.ListAccessor(list)
63      if err != nil {
64         return fmt.Errorf("unable to understand list result %#v: %v", list, err)
65      }
66      resourceVersion = listMetaInterface.GetResourceVersion()
67      initTrace.Step("Resource version extracted")
68      items, err := meta.ExtractList(list)
69      if err != nil {
70         return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
71      }
72      initTrace.Step("Objects extracted")
73      // 将 list 到的 items 添加到 store 里,这里是 store 也就是 DeltaFIFO,也就是添加一个 Sync DeltaType 这里的 resourveVersion 并没有用到
74      if err := r.syncWith(items, resourceVersion); err != nil {
75         return fmt.Errorf("unable to sync list result: %v", err)
76      }
77      initTrace.Step("SyncWith done")
78      r.setLastSyncResourceVersion(resourceVersion)
79      initTrace.Step("Resource version updated")
80      return nil
81   }(); err != nil {
82      return err
83   }
84
85   resyncerrc := make(chan error, 1)
86   cancelCh := make(chan struct{})
87   defer close(cancelCh)
88   go func() {
89      resyncCh, cleanup := r.resyncChan()
90      defer 

鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
linux下go安装/卸载重装发布时间:2022-07-10
下一篇:
Go语言核心36讲(Go语言进阶技术三)--学习笔记发布时间:2022-07-10
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap