Replace()
概述
源码版本信息
- Project: kubernetes
- Branch: master
- Last commit id: d25d741c
- Date: 2021-09-26
我们在《Kubernetes client-go 源码分析 - 开篇》里提到了自定义控制器涉及到的 client-go 组件整体工作流程,大致如下图:
DeltaFIFO 是上面的一个重要组件,今天我们来详细研究下 client-go 里 DeltaFIFO 相关代码。
Queue 接口
类似 workqueue 里的队列概念,这里也有一个队列,Queue 接口定义在 client-go/tools/cache 包中的 fifo.go 文件里,看下有哪些方法:
1type Queue interface {
2 Store
3 Pop(PopProcessFunc) (interface{}, error)
4 AddIfNotPresent(interface{}) error
5 HasSynced() bool
6 Close()
7}
这里嵌里一个 Store 接口,对应定义如下:
1type Store interface {
2 Add(obj interface{}) error
3 Update(obj interface{}) error
4 Delete(obj interface{}) error
5 List() []interface{}
6 ListKeys() []string
7 Get(obj interface{}) (item interface{}, exists bool, err error)
8 GetByKey(key string) (item interface{}, exists bool, err error)
9 Replace([]interface{}, string) error
10 Resync() error
11}
Store 接口的方法都比较直观,Store 的实现有很多,我们等下看 Queue 里用到的是哪个实现。
Queue 接口的实现是 FIFO 和 DeltaFIFO 两个类型,我们在 Informer 里用到的是 DeltaFIFO,而 DeltaFIFO 也没有依赖 FIFO,所以下面我们直接看 DeltaFIFO 是怎么实现的。
DeltaFIFO
- client-go/tools/cache/delta_fifo.go:97
1type DeltaFIFO struct {
2 lock sync.RWMutex
3 cond sync.Cond
4 items map[string]Deltas
5 queue []string
6 populated bool
7 initialPopulationCount int
8 keyFunc KeyFunc
9 knownObjects KeyListerGetter
10 closed bool
11 emitDeltaTypeReplaced bool
12}
这里有一个 Deltas 类型,看下具体的定义:
1type Deltas []Delta
2
3type Delta struct {
4 Type DeltaType
5 Object interface{}
6}
7
8type DeltaType string
9
10const (
11 Added DeltaType = "Added"
12 Updated DeltaType = "Updated"
13 Deleted DeltaType = "Deleted"
14 Replaced DeltaType = "Replaced"
15 Sync DeltaType = "Sync"
16)
可以看到 Delta 结构体保存的是 DeltaType(就是一个字符串)和发生了这种 Delta 的具体对象。
DeltaFIFO 内部主要维护的一个队列和一个 map,直观一点表示如下:
DeltaFIFO 的 New 函数是 NewDeltaFIFOWithOptions()
- client-go/tools/cache/delta_fifo.go:218
1func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
2 if opts.KeyFunction == nil {
3 opts.KeyFunction = MetaNamespaceKeyFunc
4 }
5
6 f := &DeltaFIFO{
7 items: map[string]Deltas{},
8 queue: []string{},
9 keyFunc: opts.KeyFunction,
10 knownObjects: opts.KnownObjects,
11
12 emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
13 }
14 f.cond.L = &f.lock
15 return f
16}
元素增删改 - queueActionLocked()
可以注意到 DeltaFIFO 的 Add() 等方法等方法体都很简短,大致这样:
1func (f *DeltaFIFO) Add(obj interface{}) error {
2 f.lock.Lock()
3 defer f.lock.Unlock()
4 f.populated = true
5 return f.queueActionLocked(Added, obj)
6}
里面的逻辑就是调用 queueActionLocked()
方法传递对应的 DeltaType 进去,前面提到过 DeltaType 就是 Added、Updated、Deleted 等字符串,所以我们直接先看 queueActionLocked()
方法的实现。
- client-go/tools/cache/delta_fifo.go:409
1func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
2 id, err := f.KeyOf(obj)
3 if err != nil {
4 return KeyError{obj, err}
5 }
6 oldDeltas := f.items[id]
7 newDeltas := append(oldDeltas, Delta{actionType, obj})
8 newDeltas = dedupDeltas(newDeltas)
9
10 if len(newDeltas) > 0 {
11 if _, exists := f.items[id]; !exists {
12 f.queue = append(f.queue, id)
13 }
14 f.items[id] = newDeltas
15 f.cond.Broadcast()
16 } else {
17 if oldDeltas == nil {
18 klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj)
19 return nil
20 }
21 klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj)
22 f.items[id] = newDeltas
23 return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj)
24 }
25 return nil
26}
到这里再反过来看 Add() Delete() Update() Get() 等函数,就很清晰了,只是将对应变化类型的 obj 添加到队列中。
Pop()
Pop 按照元素的添加或更新顺序有序返回一个元素(Deltas),在队列为空时会阻塞。另外 Pop 过程会先从队列中删除一个元素然后返回,所以如果处理失败了需要通过 AddIfNotPresent()
方法将这个元素加回到队列中。
Pop 的参数是 type PopProcessFunc func(interface{}) error
类型的 process,中 Pop()
函数中直接将队列里的第一个元素出队,然后丢给 process 处理,如果处理失败会重新入队,但是这个 Deltas 和对应的错误信息会被返回。
- client-go/tools/cache/delta_fifo.go:515
1func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
2 f.lock.Lock()
3 defer f.lock.Unlock()
4 for {
5 for len(f.queue) == 0 {
6 if f.closed {
7 return nil, ErrFIFOClosed
8 }
9 f.cond.Wait()
10 }
请发表评论