Kafka的心跳處理機制竟然用到了時間輪算法?
點擊上方“服務端思維”,選擇“設為星標”
回復”669“獲取獨家整理的精選資料集
回復”加群“加入全國服務端高端社群「后端圈」
? 作者 | 丁威
Broker端與客戶端的心跳在Kafka中非常的重要,因為一旦在一個心跳過期周期內(默認10s),Broker端的消費組組協(xié)調器(GroupCoordinator)會把消費者從消費組中移除,從而觸發(fā)重平衡。在2.4.x以下其版本中,消費組一旦進入重平衡狀態(tài),該消費組內所有消費者全部暫停消費,直到重平衡完成。
本文將來探討Kafka的心跳機制的具體實現(xiàn)。本文的組織結構如下:
源碼解讀Kafka心跳機制 Kafka心跳架構設計亮點(時間輪調度算法實現(xiàn)原理圖)
溫馨提示:如果大家對源碼閱讀不感興趣,可以直接跳到本文的第二部分,用流程圖、數(shù)據(jù)結構圖闡述心跳的實現(xiàn)機制。
1、源碼分析Kafka心跳機制
在介紹源碼分析之前介紹筆直的一條源碼分析經(jīng)驗:找準入口,了解調用鏈路。故筆者會先尋找歸納出Kafka心跳處理的所有入口。
1.1Kafka心跳入口總結
Kafka心跳包的處理流程如下圖所示:

圖的右邊是kafka心跳在服務端的核心處理流程,而左邊主要展示kafka中所有的心跳請求,根據(jù)上圖得知Kafka觸發(fā)心跳處理的主要請求分別如下:
KafkaConsume主動發(fā)送心跳包 消費者會以3s的頻率向服務端發(fā)送心跳包,服務端對應的入口為 KafkaApis的handleHeartbeatRequest方法。
消費者加入消費組 在消費端重平衡過程中,客戶端主動向其組協(xié)調器發(fā)起Join_Group(加入消費組)時,組協(xié)調器會認為收到一個有效的心跳包,服務端對應的處理入口:KafkaApis的handleJoinGroup方法。
消費者獲取隊列負載結果 在重平衡的第二個階段,消費組的Leader在計算出分區(qū)負載結果后會發(fā)給組協(xié)調器,消費組中的其他成員需要發(fā)生Sync_Group請求獲取負載結果,組協(xié)調器同樣認為收到了一個有效的心跳包。服務端對應的處理入口:KafkaApis的handleSyncGroupRequest。
消費者提交位點 消費者組協(xié)調器收到消費者提交位點請求,同樣可以認定消費者是存活的。位點提交的處理入口:KafkaApis的handlerCommitOffsets方法。
__consumers_offsets主題的ISR的Leader發(fā)生變化
如果__consumers_offsets主題中的各個分區(qū)Leader發(fā)生變化,與特定分區(qū)的組協(xié)調器需要重新選舉,與此組協(xié)調器相關的消費者將觸發(fā)重平衡。
上述任何一種請求,都能表明消費端是存活的,故能有效阻止服務端將客戶端端心跳設置為過期,進入下一個心跳檢測周期。
上述各個入口,特別是__consumers_offsets的ISR對消費組的影響,后續(xù)會專門展開研究,現(xiàn)在我們將重心轉移到服務端是如何處理一個心跳包的。
1.2 源碼分析Kafka心跳處理機制
從上面的流程圖可以得出,Kafka收到一個心跳包后的處理入口為GroupCoordinator的completeAndScheduleNextExpiration方法,核心代碼如下圖所示:

在介紹該方法之前首先介紹一個該方法的入?yún)⒑x:
GroupMetadata group 消費組的元信息。 MemberMetadata member 消費者的元信息。 long timeoutMs 心跳超時時間,默認為10s,這個參數(shù)是由消費端的session.timeout.ms參數(shù)設置,默認為10s。
Step1:為消費組設置唯一標識:groupId + "-" + memberId構成。
Step2:將hearbeatSatisfied設置為true,表示該消費者收到一個有效的心跳包。
Step3:收到一個有效的心跳包,通知定時調度器停止本次的心跳過期檢測。
Step4:構建一個DelayedHearbeat,進入下一個心跳檢測周期。
接下來將分別對Step3、Step4展開詳細介紹。
1.2.1 心跳檢測正常處理邏輯
在收到一個心跳包時,嘗試將本次檢測設置成功,具體的實現(xiàn)由DelayedOperation的checkAndComplete方法,代碼如下:

Kafka使用一個數(shù)據(jù)結構來存儲需要跟蹤的所有消費者,在這里成為Watch機制。
實現(xiàn)要點:根據(jù)key獲取WatchList,然后從獲取的WatchList中內部的ConcurrentMap中再按照Key獲取對應與當前消費者對應的Watch。
如果沒有找到對應消費者的Watch,則直接返回,無需檢測,說明已經(jīng)成功檢測。 如果找到了對應消費者的Watch,則執(zhí)行被watch的tryCompleteWatched方法。
Watch的數(shù)據(jù)結構如下:
接下來重點關注Watches的tryCompleteWatched方法,該方法的詳細調用代碼如下圖所示:
如果消費組的狀態(tài)處于Dead 如果消費組的狀態(tài)為Pending(消費組在重平衡中) hearbeatSatisfied為true,即收到了一個有效的心跳包。
上述代碼的實現(xiàn)比較簡單,這里就不一一羅列,其核心關鍵點如下:
刪除對應的Watch,表示一次心跳檢測成功。 Watchs中存儲的對象是DelayedOperation(Kafka延遲類型的父類)的子類,在心跳檢測中具體為DelayedHeartbeat。 最終執(zhí)行DelayedOperation的是TimeTask的cancel方法(取消延遲任務),就是從延遲調度中移除自己,表示沒有超時,結束本輪的超時檢測,具體的存儲結構,將在下文詳介紹如果開啟新一輪心跳檢測時再詳細講解。
為了方便大家閱讀源碼,其主要的調用時序圖如下:

1.2.2 開啟下一輪心跳檢測
1.2.2.1將延遲任務放入時間輪
在接受到一個新的心跳包首先用于清除上一輪設置的延遲任務,然后需要開啟一個新的延遲任務,接下來我們將來具體看看Kafka如何開啟新一輪心跳檢測機制,**其本質上是Kafka的延遲(定時)實現(xiàn)原理。**代碼入口如下圖所示:

開啟下一輪調度時首先將Member的heartbeatSatisfied設置為false。
其核心思想是創(chuàng)建一個心跳延遲任務DelayedHeartbeat,并對其檢測是否完成或者添加Watch,啟動心跳延遲或者等待下一個心跳包的到來。
其實看到這里,我們應該能得到一個關于Kafka心跳檢測機制的實現(xiàn)思路:
開啟一個延遲任務,延遲檢查時間為心跳過期時間,一旦延遲任務執(zhí)行,則意外著心跳超時。 當收到一個心跳包時,需要取消上一次設置的延遲任務。 使用循環(huán)使用延遲任務,從而實現(xiàn)類似定時任務的效果。
接下來我們詳細探討一下DelayedOperationPurgatory的tryCompleteElseWatch方法,其代碼如下圖所示:

Step1:嘗試調用DelayedHeartbeat的tryComplete方法,判斷是否可以判斷完成,這里主要是消費組是否為重平衡或者狀態(tài)為Dead,如果上述情況不滿足,則會返回false,因為在發(fā)起下一輪心跳包時已將heartbeatSatisfied設置為false。
Step2:為該消費者添加到Watch中,表示kafka需要跟蹤該消費者的心跳。
Step3:再次調用maybeTryComplete方法,再嘗試判斷是否該心跳檢測完成。
Step4:如果沒有完成,則該任務延遲任務(DelayedHeartbeat)添加到定時調度中。
接下來將進入到Kafka心跳的核心機制,即延遲任務的實現(xiàn)機制。


并持有一個關鍵字段:該定時任務的過期時間,等于系統(tǒng)當前時間+過期時間,在心跳檢測場景中默認為10s。
繼續(xù)跟蹤SystemTimer的addTimerTaskEntry,其代碼如下:

addTimerTaskEntry的核心實現(xiàn)如下:
嘗試將延遲任務添加到時間輪,如果已經(jīng)過期,則提交到線程池,觸發(fā)心跳過期的邏輯,提交到線程后,DelayedOperation的run方法會被調用,最終onExpiration方法被調用。

接下來重點談一下往時間輪中添加任務的具體實現(xiàn),核心代碼見下圖所示:

核心實現(xiàn)要點:
Step1:如果任務已經(jīng)被取消或者已過期,返回false。如果返回false,則會觸發(fā)定時任務過期。
Step2根據(jù)過期時間,放入到時間輪中指定的位置,時間輪的數(shù)據(jù)結構如下:

每一個格代表一個時間間隔,例如200ms,當前指針指向的格子,代表該格子中的所有任務過期,例如現(xiàn)在要要插入一個700ms過期,從當前指針的下一格開始算起,放入第4格中。
另外時間輪的總格子有限,則該時間輪能計算的最大時間是有限的,例如一個8格的時間輪,每一格代表200ms,則如果要在2s后過期,顯然這個時間輪無法存儲,通常的解決方案是采用多級時間輪,另外一級的時間輪,其時間精度會更粗。
結合上述關于時間輪的原理,再去看上述代碼,就顯得容易看懂了。
Step3:就是處理第一級時間輪無法滿足過期時間,則放入到第二級時間輪中。
1.2.2.2 驅動時間輪
基于時間輪算法,除了數(shù)據(jù)按找時間輪到方向、觸發(fā)時間存儲在合適的刻度量,還需要驅動時間輪指針。Kafka中的驅動時間輪入口為:

具體實現(xiàn)代碼如下:

上述代碼看起來比較簡單,就不一一介紹,為了方便大家讀懂上面的代碼,我們只需要了解一下kafka采用時間輪的實際存儲數(shù)據(jù)結構,即能很容易理解上述代碼:

其核心特點:環(huán)形隊列就是一個數(shù)組,每一個元素在Kafka中對應一個桶,每一個桶存儲一個TimerTaskList(鏈表),每次指針指向的TimerTaskList,將該鏈表中的元素代表的任務全部執(zhí)行。
2、圖解Kafka心跳架構設計
讀起源碼來說或許比較枯燥,接下來給出Kafka心跳處理的圖解,重點是闡述Kafka時間輪算法的核心數(shù)據(jù)結構。

— 本文結束 —

關注我,回復 「加群」 加入各種主題討論群。
對「服務端思維」有期待,請在文末點個在看
喜歡這篇文章,歡迎轉發(fā)、分享朋友圈


