client-go 之 Reflector 源碼分析
圖片來源:https://unsplash.com/photos/mFl5WwGJnTs
前面我們說了 Informer 通過對(duì) APIServer 的資源對(duì)象執(zhí)行 List 和 Watch 操作,把獲取到的數(shù)據(jù)存儲(chǔ)在本地的緩存中,其中實(shí)現(xiàn)這個(gè)的核心功能就是 Reflector,我們可以稱其為反射器,從名字我們可以看出來它的主要功能就是反射,就是將 Etcd 里面的數(shù)據(jù)反射到本地存儲(chǔ)(DeltaFIFO)中。Reflector 首先通過 List 操作獲取所有的資源對(duì)象數(shù)據(jù),保存到本地存儲(chǔ),然后通過 Watch 操作監(jiān)控資源的變化,觸發(fā)相應(yīng)的事件處理,比如前面示例中的 Add 事件、Update 事件、Delete 事件。
Reflector 結(jié)構(gòu)體的定義位于 staging/src/k8s.io/client-go/tools/cache/reflector.go 下面:
// k8s.io/client-go/tools/cache/reflector.go// Reflector(反射器) 監(jiān)聽指定的資源,將所有的變化都反射到給定的存儲(chǔ)中去type Reflector struct {// name 標(biāo)識(shí)這個(gè)反射器的名稱,默認(rèn)為 文件:行數(shù)(比如reflector.go:125)// 默認(rèn)名字通過 k8s.io/apimachinery/pkg/util/naming/from_stack.go 下面的 GetNameFromCallsite 函數(shù)生成name string// 期望放到 Store 中的類型名稱,如果提供,則是 expectedGVK 的字符串形式// 否則就是 expectedType 的字符串,它僅僅用于顯示,不用于解析或者比較。expectedTypeName string// 我們放到 Store 中的對(duì)象類型expectedType reflect.Type// 如果是非結(jié)構(gòu)化的,我們期望放在 Sotre 中的對(duì)象的 GVKexpectedGVK *schema.GroupVersionKind// 與 watch 源同步的目標(biāo) Storestore Store// 用來執(zhí)行 lists 和 watches 操作的 listerWatcher 接口(最重要的)listerWatcher ListerWatcher// backoff manages backoff of ListWatchbackoffManager wait.BackoffManagerresyncPeriod time.Duration// ShouldResync 會(huì)周期性的被調(diào)用,當(dāng)返回 true 的時(shí)候,就會(huì)調(diào)用 Store 的 Resync 操作ShouldResync func() boolclock clock.ClockpaginatedResult bool// Kubernetes 資源在 APIServer 中都是有版本的,對(duì)象的任何修改(添加、刪除、更新)都會(huì)造成資源版本更新,lastSyncResourceVersion 就是指的這個(gè)版本lastSyncResourceVersion string// 如果之前的 list 或 watch 帶有 lastSyncResourceVersion 的請(qǐng)求中是一個(gè) HTTP 410(Gone)的失敗請(qǐng)求,則 isLastSyncResourceVersionGone 為 trueisLastSyncResourceVersionGone bool// lastSyncResourceVersionMutex 用于保證對(duì) lastSyncResourceVersion 的讀/寫訪問。lastSyncResourceVersionMutex sync.RWMutexWatchListPageSize int64}// NewReflector 創(chuàng)建一個(gè)新的反射器對(duì)象,將使給定的 Store 保持與服務(wù)器中指定的資源對(duì)象的內(nèi)容同步。// 反射器只把具有 expectedType 類型的對(duì)象放到 Store 中,除非 expectedType 是 nil。// 如果 resyncPeriod 是非0,那么反射器會(huì)周期性地檢查 ShouldResync 函數(shù)來決定是否調(diào)用 Store 的 Resync 操作// `ShouldResync==nil` 意味著總是要執(zhí)行 Resync 操作。// 這使得你可以使用反射器周期性地處理所有的全量和增量的對(duì)象。func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {// 默認(rèn)的反射器名稱為 file:linereturn NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)}// NewNamedReflector 與 NewReflector 一樣,只是指定了一個(gè) name 用于日志記錄func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {realClock := &clock.RealClock{}r := &Reflector{name: name,listerWatcher: lw,store: store,backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),resyncPeriod: resyncPeriod,clock: realClock,}r.setExpectedType(expectedType)return r}
從源碼中我們可以看出來通過 NewReflector 實(shí)例化反射器的時(shí)候,必須傳入一個(gè) ListerWatcher 接口對(duì)象,這個(gè)也是反射器最核心的功能,該接口擁有 List 和 Watch 方法,用于獲取和監(jiān)控資源對(duì)象。
// k8s.io/client-go/tools/cache/listwatch.go// Lister 是知道如何執(zhí)行初始化List列表的對(duì)象type Lister interface {// List 應(yīng)該返回一個(gè)列表類型的對(duì)象;// Items 字段將被解析,ResourceVersion 字段將被用于正確啟動(dòng) watch 的地方List(options metav1.ListOptions) (runtime.Object, error)}// Watcher 是知道如何執(zhí)行 watch 操作的任何對(duì)象type Watcher interface {// Watch 在指定的版本開始執(zhí)行 watch 操作Watch(options metav1.ListOptions) (watch.Interface, error)}// ListerWatcher 是任何知道如何對(duì)一個(gè)資源執(zhí)行初始化List列表和啟動(dòng)Watch監(jiān)控操作的對(duì)象type ListerWatcher interface {ListerWatcher}
而 Reflector 對(duì)象通過 Run 函數(shù)來啟動(dòng)監(jiān)控并處理監(jiān)控事件的:
// k8s.io/client-go/tools/cache/reflector.go// Run 函數(shù)反復(fù)使用反射器的 ListAndWatch 函數(shù)來獲取所有對(duì)象和后續(xù)的 deltas。// 當(dāng) stopCh 被關(guān)閉的時(shí)候,Run函數(shù)才會(huì)退出。func (r *Reflector) Run(stopCh <-chan struct{}) {klog.V(2).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)wait.BackoffUntil(func() {if err := r.ListAndWatch(stopCh); err != nil {utilruntime.HandleError(err)}}, r.backoffManager, true, stopCh)klog.V(2).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)}
所以不管我們傳入的 ListWatcher 對(duì)象是如何實(shí)現(xiàn)的 List 和 Watch 操作,只要實(shí)現(xiàn)了就可以,最主要的還是看 ListAndWatch 函數(shù)是如何去實(shí)現(xiàn)的,如何去調(diào)用 List 和 Watch 的:
// k8s.io/client-go/tools/cache/reflector.go// ListAndWatch 函數(shù)首先列出所有的對(duì)象,并在調(diào)用的時(shí)候獲得資源版本,然后使用該資源版本來進(jìn)行 watch 操作。// 如果 ListAndWatch 沒有初始化 watch 成功就會(huì)返回錯(cuò)誤。func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)var resourceVersion stringoptions := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}if err := func() error {initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})defer initTrace.LogIfLong(10 * time.Second)var list runtime.Objectvar paginatedResult boolvar err errorlistCh := make(chan struct{}, 1)panicCh := make(chan interface{}, 1)go func() {defer func() {if r := recover(); r != nil {panicCh <- r}}()// 如果 listWatcher 支持,會(huì)嘗試 chunks(分塊)收集 List 列表數(shù)據(jù)// 如果不支持,第一個(gè) List 列表請(qǐng)求將返回完整的響應(yīng)數(shù)據(jù)。pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {return r.listerWatcher.List(opts)}))switch {case r.WatchListPageSize != 0:pager.PageSize = r.WatchListPageSizecase r.paginatedResult:// 獲得一個(gè)初始的分頁結(jié)果。假定此資源和服務(wù)器符合分頁請(qǐng)求,并保留默認(rèn)的分頁器大小設(shè)置case options.ResourceVersion != "" && options.ResourceVersion != "0":pager.PageSize = 0}list, paginatedResult, err = pager.List(context.Background(), options)if isExpiredError(err) {r.setIsLastSyncResourceVersionExpired(true)list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})}close(listCh)}()select {case <-stopCh:return nilcase r := <-panicCh:panic(r)case <-listCh:}if err != nil {return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedTypeName, err)}if options.ResourceVersion == "0" && paginatedResult {r.paginatedResult = true}r.setIsLastSyncResourceVersionExpired(false) // list 成功initTrace.Step("Objects listed")listMetaInterface, err := meta.ListAccessor(list)if err != nil {return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)}// 獲取資源版本號(hào)resourceVersion = listMetaInterface.GetResourceVersion()initTrace.Step("Resource version extracted")// 將資源數(shù)據(jù)轉(zhuǎn)換成資源對(duì)象列表,將 runtime.Object 對(duì)象轉(zhuǎn)換成 []runtime.Object 對(duì)象items, err := meta.ExtractList(list)if err != nil {return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)}initTrace.Step("Objects extracted")// 將資源對(duì)象列表中的資源對(duì)象和資源版本號(hào)存儲(chǔ)在 Store 中if err := r.syncWith(items, resourceVersion); err != nil {return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)}initTrace.Step("SyncWith done")r.setLastSyncResourceVersion(resourceVersion)initTrace.Step("Resource version updated")return nil}(); err != nil {return err}resyncerrc := make(chan error, 1)cancelCh := make(chan struct{})defer close(cancelCh)go func() {resyncCh, cleanup := r.resyncChan()defer func() {cleanup()}()for {select {case <-resyncCh:case <-stopCh:returncase <-cancelCh:return}// 如果 ShouldResync 為 nil 或者調(diào)用返回true,則執(zhí)行 Store 的 Resync 操作if r.ShouldResync == nil || r.ShouldResync() {klog.V(4).Infof("%s: forcing resync", r.name)if err := r.store.Resync(); err != nil {resyncerrc <- errreturn}}cleanup()resyncCh, cleanup = r.resyncChan()}}()for {// stopCh 一個(gè)停止循環(huán)的機(jī)會(huì)select {case <-stopCh:return nildefault:}timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))// 設(shè)置watch的選項(xiàng),因?yàn)榍捌诹信e了全量對(duì)象,從這里只要監(jiān)聽最新版本以后的資源就可以了// 如果沒有資源變化總不能一直掛著吧?也不知道是卡死了還是怎么了,所以設(shè)置一個(gè)超時(shí)會(huì)好一點(diǎn)options = metav1.ListOptions{ResourceVersion: resourceVersion,TimeoutSeconds: &timeoutSeconds,AllowWatchBookmarks: true,}start := r.clock.Now()// 執(zhí)行 Watch 操作w, err := r.listerWatcher.Watch(options)if err != nil {switch {case isExpiredError(err):klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)case err == io.EOF:// watch closed normallycase err == io.ErrUnexpectedEOF:klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err)default:utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err))}if utilnet.IsConnectionRefused(err) {time.Sleep(time.Second)continue}return nil}// 調(diào)用 watchHandler 來處理分發(fā) watch 到的事件對(duì)象if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {if err != errorStopRequested {switch {case isExpiredError(err):klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)default:klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)}}return nil}}}
首先通過反射器的 relistResourceVersion 函數(shù)獲得反射器 relist 的資源版本,如果資源版本非 0,則表示根據(jù)資源版本號(hào)繼續(xù)獲取,當(dāng)傳輸過程中遇到網(wǎng)絡(luò)故障或者其他原因?qū)е轮袛?,下次再連接時(shí),會(huì)根據(jù)資源版本號(hào)繼續(xù)傳輸未完成的部分??梢允贡镜鼐彺嬷械臄?shù)據(jù)與Etcd集群中的數(shù)據(jù)保持一致,該函數(shù)實(shí)現(xiàn)如下所示:
// k8s.io/client-go/tools/cache/reflector.go// relistResourceVersion 決定了反射器應(yīng)該list或者relist的資源版本。// 如果最后一次relist的結(jié)果是HTTP 410(Gone)狀態(tài)碼,則返回"",這樣relist將通過quorum讀取etcd中可用的最新資源版本。// 返回使用 lastSyncResourceVersion,這樣反射器就不會(huì)使用在relist結(jié)果或watch事件中watch到的資源版本更老的資源版本進(jìn)行relist了func (r *Reflector) relistResourceVersion() string {r.lastSyncResourceVersionMutex.RLock()defer r.lastSyncResourceVersionMutex.RUnlock()if r.isLastSyncResourceVersionGone {// 因?yàn)榉瓷淦鲿?huì)進(jìn)行分頁List請(qǐng)求,如果 lastSyncResourceVersion 過期了,所有的分頁列表請(qǐng)求就都會(huì)跳過 watch 緩存// 所以設(shè)置 ResourceVersion="",然后再次 List,重新建立反射器到最新的可用資源版本,從 etcd 中讀取,保持一致性。return ""}if r.lastSyncResourceVersion == "" {// 反射器執(zhí)行的初始 List 操作的時(shí)候使用0作為資源版本。return "0"}return r.lastSyncResourceVersion}
上面的 ListAndWatch 函數(shù)實(shí)現(xiàn)看上去雖然非常復(fù)雜,但其實(shí)大部分是對(duì)分頁的各種情況進(jìn)行處理,最核心的還是調(diào)用 r.listerWatcher.List(opts) 獲取全量的資源對(duì)象,而這個(gè) List 其實(shí) ListerWatcher 實(shí)現(xiàn)的 List 方法,這個(gè) ListerWatcher 接口實(shí)際上在該接口定義的同一個(gè)文件中就有一個(gè) ListWatch 結(jié)構(gòu)體實(shí)現(xiàn)了:
// k8s.io/client-go/tools/cache/listwatch.go// ListFunc 知道如何 List 資源type ListFunc func(options metav1.ListOptions) (runtime.Object, error)// WatchFunc 知道如何 watch 資源type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)// ListWatch 結(jié)構(gòu)體知道如何 list 和 watch 資源對(duì)象,它實(shí)現(xiàn)了 ListerWatcher 接口。// 它為 NewReflector 使用者提供了方便的函數(shù)。其中 ListFunc 和 WatchFunc 不能為 nil。type ListWatch struct {ListFunc ListFuncWatchFunc WatchFunc// DisableChunking 對(duì) list watcher 請(qǐng)求不分塊。DisableChunking bool}// 列出一組 APIServer 資源func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) {return lw.ListFunc(options)}// Watch 一組 APIServer 資源func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) {return lw.WatchFunc(options)}
當(dāng)我們真正使用一個(gè) Informer 對(duì)象的時(shí)候,實(shí)例化的時(shí)候就會(huì)調(diào)用這里的 ListWatch 來進(jìn)行初始化,比如前面我們實(shí)例中使用的 Deployment Informer,
// k8s.io/client-go/informers/apps/v1/deployment.go// NewFilteredDeploymentInformer 為 Deployment 構(gòu)造一個(gè)新的 Informer。// 總是傾向于使用一個(gè) informer 工廠來獲取一個(gè) shared informer,而不是獲取一個(gè)獨(dú)立的 informer,這樣可以減少內(nèi)存占用和服務(wù)器的連接數(shù)。func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {return cache.NewSharedIndexInformer(&cache.ListWatch{ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {if tweakListOptions != nil {tweakListOptions(&options)}return client.AppsV1().Deployments(namespace).List(context.TODO(), options)},WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {if tweakListOptions != nil {tweakListOptions(&options)}return client.AppsV1().Deployments(namespace).Watch(context.TODO(), options)},},&appsv1.Deployment{},resyncPeriod,indexers,)}func (f *deploymentInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {return NewFilteredDeploymentInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)}func (f *deploymentInformer) Informer() cache.SharedIndexInformer {return f.factory.InformerFor(&appsv1.Deployment{}, f.defaultInformer)}
從上面代碼我們就可以看出來當(dāng)我們?nèi)フ{(diào)用一個(gè)資源對(duì)象的 Informer() 的時(shí)候,就會(huì)去調(diào)用上面的 NewFilteredDeploymentInformer 函數(shù)進(jìn)行初始化,而在初始化的使用就傳入了 cache.ListWatch 對(duì)象,其中就有 List 和 Watch 的實(shí)現(xiàn)操作,也就是說前面反射器在 ListAndWatch 里面調(diào)用的 ListWatcher 的 List 操作是在一個(gè)具體的資源對(duì)象的 Informer 中實(shí)現(xiàn)的,比如我們這里就是通過的 ClientSet 客戶端與 APIServer 交互獲取到 Deployment 的資源列表數(shù)據(jù)的,通過在 ListFunc 中的 client.AppsV1().Deployments(namespace).List(context.TODO(), options) 實(shí)現(xiàn)的,這下應(yīng)該好理解了吧。
獲取到了全量的 List 數(shù)據(jù)過后,通過 listMetaInterface.GetResourceVersion() 來獲取資源的版本號(hào),ResourceVersion(資源版本號(hào))非常重要,Kubernetes 中所有的資源都擁有該字段,它標(biāo)識(shí)當(dāng)前資源對(duì)象的版本號(hào),每次修改(CUD)當(dāng)前資源對(duì)象時(shí),Kubernetes API Server 都會(huì)更改 ResourceVersion,這樣 client-go 執(zhí)行 Watch 操作時(shí)可以根據(jù)ResourceVersion 來確定當(dāng)前資源對(duì)象是否發(fā)生了變化。
然后通過 meta.ExtractList 函數(shù)將資源數(shù)據(jù)轉(zhuǎn)換成資源對(duì)象列表,將 runtime.Object 對(duì)象轉(zhuǎn)換成 []runtime.Object 對(duì)象,因?yàn)槿揩@取的是一個(gè)資源列表。
接下來是通過反射器的 syncWith 函數(shù)將資源對(duì)象列表中的資源對(duì)象和資源版本號(hào)存儲(chǔ)在 Store 中,這個(gè)會(huì)在后面的章節(jié)中詳細(xì)說明。
最后處理完成后通過 r.setLastSyncResourceVersion(resourceVersion) 操作來設(shè)置最新的資源版本號(hào),其他的就是啟動(dòng)一個(gè) goroutine 去定期檢查是否需要執(zhí)行 Resync 操作,調(diào)用存儲(chǔ)中的 r.store.Resync() 來執(zhí)行,關(guān)于存儲(chǔ)的實(shí)現(xiàn)在后面和大家進(jìn)行講解。
緊接著就是 Watch 操作了,Watch 操作通過 HTTP 協(xié)議與 APIServer 建立長連接,接收Kubernetes API Server 發(fā)來的資源變更事件,和 List 操作一樣,Watch 的真正實(shí)現(xiàn)也是具體的 Informer 初始化的時(shí)候傳入的,比如上面的 Deployment Informer 中初始化的時(shí)候傳入的 WatchFunc,底層也是通過 ClientSet 客戶端對(duì) Deployment 執(zhí)行 Watch 操作 client.AppsV1().Deployments(namespace).Watch(context.TODO(), options) 實(shí)現(xiàn)的。
獲得 watch 的資源數(shù)據(jù)后,通過調(diào)用 r.watchHandler 來處理資源的變更事件,當(dāng)觸發(fā)Add 事件、Update 事件、Delete 事件時(shí),將對(duì)應(yīng)的資源對(duì)象更新到本地緩存(DeltaFIFO)中并更新 ResourceVersion 資源版本號(hào)。
// k8s.io/client-go/tools/cache/reflector.go// watchHandler 監(jiān)聽 w 保持資源版本最新func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {eventCount := 0defer w.Stop()loop:for {select {case <-stopCh:return errorStopRequestedcase err := <-errc:return errcase event, ok := <-w.ResultChan(): // 從 watch 中獲取事件對(duì)象if !ok {break loop}if event.Type == watch.Error {return apierrors.FromObject(event.Object)}if r.expectedType != nil {if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))continue}}if r.expectedGVK != nil {if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))continue}}meta, err := meta.Accessor(event.Object)if err != nil {utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))continue}// 獲得當(dāng)前 watch 到資源的資源版本號(hào)newResourceVersion := meta.GetResourceVersion()switch event.Type { // 分發(fā)事件case watch.Added: // Add 事件err := r.store.Add(event.Object)if err != nil {utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))}case watch.Modified: // Update 事件err := r.store.Update(event.Object)if err != nil {utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))}case watch.Deleted: // Delete 事件err := r.store.Delete(event.Object)if err != nil {utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))}case watch.Bookmark:// `Bookmark` 意味著 watch 已經(jīng)同步到這里了,只要更新資源版本即可。default:utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))}// 更新資源版本號(hào)*resourceVersion = newResourceVersionr.setLastSyncResourceVersion(newResourceVersion)eventCount++}}watchDuration := r.clock.Since(start)if watchDuration < 1*time.Second && eventCount == 0 {return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)}klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount)return nil}
這就是 Reflector 反射器中最核心的 ListAndWatch 實(shí)現(xiàn),從上面的實(shí)現(xiàn)我們可以看出獲取到的數(shù)據(jù)最終都流向了本地的 Store,也就是 DeltaFIFO,所以接下來我們需要來分析 DeltaFIFO 的實(shí)現(xiàn)。
K8S進(jìn)階訓(xùn)練營,點(diǎn)擊下方圖片了解詳情

