概述入口 - Reflector.Run()核心 - Reflector.ListAndWatch()Reflector.watchHandler()NewReflector()小结
概述
源码版本:kubernetes master 分支 commit-fe62fc(2021年10月14日)
回顾一下 Reflector 在整个自定义控制器工作流中的位置:
在《Kubernetes client-go 源码分析 - 开篇》中我们提到过 Reflector 的任务就是向 apiserver watch 特定类型的资源,拿到变更通知后将其丢到 DeltaFIFO 队列中。另外前面已经在 《Kubernetes client-go 源码分析 - ListWatcher》中分析过 ListWatcher 是如何从 apiserver 中 list-watch 资源的,今天我们继续来看 Reflector 的实现。
入口 - Reflector.Run()
Reflector 的启动入口是 Run()
方法:
- client-go/tools/cache/reflector.go:218
1func (r *Reflector) Run(stopCh <-chan struct{}) {
2 klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
3 wait.BackoffUntil(func() {
4 if err := r.ListAndWatch(stopCh); err != nil {
5 r.watchErrorHandler(r, err)
6 }
7 }, r.backoffManager, true, stopCh)
8 klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
9}
这里有一些健壮性机制,用于处理 apiserver 短暂失联的场景。我们直接来看主要逻辑先,也就是 Reflector.ListAndWatch()
方法的内容。
核心 - Reflector.ListAndWatch()
Reflector.ListAndWatch()
方法有将近 200 行,是 Reflector 的核心逻辑之一。ListAndWatch() 方法做的事情是先 list 特定资源的所有对象,然后获取其资源版本,接着使用这个资源版本来开始 watch 流程。watch 到新版本资源然后将其加入 DeltaFIFO 的动作是在 watchHandler() 方法中具体实现的,后面一节会单独分析。在此之前 list 到的最新 items 会通过 syncWith() 方法添加一个 Sync 类型的 DeltaType 到 DeltaFIFO 中,所以 list 操作本身也会触发后面的调谐逻辑运行。具体来看:
- client-go/tools/cache/reflector.go:254
1func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
2 klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
3 var resourceVersion string
4
5 // 当 r.lastSyncResourceVersion 为 "" 时这里为 "0",当使用 r.lastSyncResourceVersion 失败时这里为 ""
6 // 区别是 "" 会直接请求到 etcd,获取一个最新的版本,而 "0" 访问的是 cache
7 options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}
8
9 if err := func() error {
10 // trace 是用于记录操作耗时的,这里的逻辑是超过 10s 的步骤打印出来
11 initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
12 defer initTrace.LogIfLong(10 * time.Second)
13 var list runtime.Object
14 var paginatedResult bool
15 var err error
16 listCh := make(chan struct{}, 1)
17 panicCh := make(chan interface{}, 1)
18 go func() { // 内嵌一个函数,这里会直接调用
19 defer func() {
20 if r := recover(); r != nil { // 收集这个 goroutine panic 的时候将奔溃信息
21 panicCh <- r
22 }
23 }()
24 // 开始尝试收集 list 的 chunks,我们在 《Kubernetes List-Watch 机制原理与实现 - chunked》中介绍过相关逻辑
25 pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
26 return r.listerWatcher.List(opts)
27 }))
28 switch {
29 case r.WatchListPageSize != 0:
30 pager.PageSize = r.WatchListPageSize
31 case r.paginatedResult:
32 case options.ResourceVersion != "" && options.ResourceVersion != "0":
33 pager.PageSize = 0
34 }
35
36 list, paginatedResult, err = pager.List(context.Background(), options)
37 if isExpiredError(err) || isTooLargeResourceVersionError(err) {
38 // 设置这个属性后,下一次 list 会从 etcd 里取
39 r.setIsLastSyncResourceVersionUnavailable(true)
40 list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
41 }
42 close(listCh)
43 }()
44 select {
45 case <-stopCh:
46 return nil
47 case r := <-panicCh:
48 panic(r)
49 case <-listCh:
50 }
51 if err != nil {
52 return fmt.Errorf("failed to list %v: %v", r.expectedTypeName, err)
53 }
54
55 if options.ResourceVersion == "0" && paginatedResult {
56 r.paginatedResult = true
57 }
58
59 // list 成功
60 r.setIsLastSyncResourceVersionUnavailable(false)
61 initTrace.Step("Objects listed")
62 listMetaInterface, err := meta.ListAccessor(list)
63 if err != nil {
64 return fmt.Errorf("unable to understand list result %#v: %v", list, err)
65 }
66 resourceVersion = listMetaInterface.GetResourceVersion()
67 initTrace.Step("Resource version extracted")
68 items, err := meta.ExtractList(list)
69 if err != nil {
70 return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
71 }
72 initTrace.Step("Objects extracted")
73 // 将 list 到的 items 添加到 store 里,这里是 store 也就是 DeltaFIFO,也就是添加一个 Sync DeltaType 这里的 resourveVersion 并没有用到
74 if err := r.syncWith(items, resourceVersion); err != nil {
75 return fmt.Errorf("unable to sync list result: %v", err)
76 }
77 initTrace.Step("SyncWith done")
78 r.setLastSyncResourceVersion(resourceVersion)
79 initTrace.Step("Resource version updated")
80 return nil
81 }(); err != nil {
82 return err
83 }
84
85 resyncerrc := make(chan error, 1)
86 cancelCh := make(chan struct{})
87 defer close(cancelCh)
88 go func() {
89 resyncCh, cleanup := r.resyncChan()
90 defer
全部评论
请发表评论