時間輪在Kafka的實踐

桔妹導讀:時間輪是一個應用場景很廣的組件,在很多高性能中間件中都有它的身影,如Netty、Quartz、Akka,當然也包括Kafka,本文主要介紹時間輪在kafka的應用和實戰(zhàn),從核心源碼和設計的角度對時間輪進行深入的講解 。
1.


30s超時,就創(chuàng)建一個index從0到30的環(huán)形隊列(本質是個數(shù)組) 環(huán)上每一個slot是一個Set<uid>,任務集合 同時還有一個Map<uid, index>,記錄uid落在環(huán)上的哪個slot里
從Map結構中,查找出這個uid存儲在哪一個slot里 從這個slot的Set結構中,刪除這個uid 將uid重新加入到新的slot中,具體是哪一個slot呢 => Current Index指針所指向的上一個slot,因為這個slot,會被timer在30s之后掃描到 更新Map,這個uid對應slot的index值
2.

圖二tickMs:時間輪由多個時間格組成,每個時間格就是tickMs,它代表當前時間輪的基本時間跨度。 wheelSize:代表每一層時間輪的格數(shù) interval:當前時間輪的總體時間跨度,interval=tickMs × wheelSize startMs:構造當層時間輪時候的當前時間,第一層的時間輪的startMs是TimeUnit.NANOSECONDS.toMillis(nanoseconds()),上層時間輪的startMs為下層時間輪的currentTime。 currentTime:表示時間輪當前所處的時間,currentTime是tickMs的整數(shù)倍(通過currentTime=startMs - (startMs % tickMs來保正currentTime一定是tickMs的整數(shù)倍),這個運算類比鐘表中分鐘里65秒分鐘指針指向的還是1分鐘)。currentTime可以將整個時間輪劃分為到期部分和未到期部分,currentTime當前指向的時間格也屬于到期部分,表示剛好到期,需要處理此時間格所對應的TimerTaskList的所有任務。

剛才提到的350ms的任務,不會插入到第一層時間輪,會插入到interval=20*20的第二層時間輪中,具體插入到時間輪的哪個bucket呢?先用350/tickMs(20)=virtualId(17),然后virtualId(17) %wheelSize (20) = 17,所以350會放在第17個bucket。如果此時有一個450ms后執(zhí)行的任務,那么會放在第三層時間輪中,按照剛才的計算公式,會放在第0個bucket。第0個bucket里會包含[400,800)ms的任務。隨著時間流逝,當時間過去了400ms,那么450ms后就要執(zhí)行的任務還剩下50ms的時間才能執(zhí)行,此時有一個時間輪降級的操作,將50ms任務重新提交到層級時間輪中,那么此時50ms的任務根據(jù)公式會放入第二個時間輪的第2個bucket中,此bucket的時間范圍為[40,60)ms,然后再經過40ms,這個50ms的任務又會被監(jiān)控到,此時距離任務執(zhí)行還有10ms,同樣將10ms的任務提交到層級時間輪,此時會加入到第一層時間輪的第10個bucket,所以再經過10ms后,此任務到期,最終執(zhí)行。

//在Systemtimer中添加一個任務,任務被包裝為一個TimerTaskEntryprivate def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {//先判斷是否可以添加進時間輪中,如果不可以添加進去代表任務已經過期或者任務被取消,注意這里的timingWheel持有上一層時間輪的引用,所以可能存在遞歸調用if (!timingWheel.add(timerTaskEntry)) {// Already expired or cancelledif (!timerTaskEntry.cancelled)//過期任務直接線程池異步執(zhí)行掉taskExecutor.submit(timerTaskEntry.timerTask)}}timingWheel添加任務,遞歸添加直到添加該任務進合適的時間輪的bucket中def add(timerTaskEntry: TimerTaskEntry): Boolean = {val expiration = timerTaskEntry.expirationMs//任務取消if (timerTaskEntry.cancelled) {// Cancelledfalse} else if (expiration < currentTime + tickMs) {// 任務過期后會被執(zhí)行false} else if (expiration < currentTime + interval) {//任務過期時間比當前時間輪時間加周期小說明任務過期時間在本時間輪周期內val virtualId = expiration / tickMs//找到任務對應本時間輪的bucketval bucket = buckets((virtualId % wheelSize.toLong).toInt)bucket.add(timerTaskEntry)// Set the bucket expiration time//只有本bucket內的任務都過期后才會bucket.setExpiration返回true此時將bucket放入延遲隊列if (bucket.setExpiration(virtualId * tickMs)) {//bucket是一個TimerTaskList,它實現(xiàn)了java.util.concurrent.Delayed接口,里面是一個多任務組成的鏈表,圖2有說明queue.offer(bucket)}true} else {// Out of the interval. Put it into the parent timer//任務的過期時間不在本時間輪周期內說明需要升級時間輪,如果不存在則構造上一層時間輪,繼續(xù)用上一層時間輪添加任務if (overflowWheel == null) addOverflowWheel()overflowWheel.add(timerTaskEntry)}}
private[this] def addOverflowWheel(): Unit = {synchronized {if (overflowWheel == null) {overflowWheel = new TimingWheel(tickMs = interval,wheelSize = wheelSize,startMs = currentTime,taskCounter = taskCounter,queue)}}}
def advanceClock(timeMs: Long): Unit = {if (timeMs >= currentTime + tickMs) {//把當前時間打平為時間輪tickMs的整數(shù)倍currentTime = timeMs - (timeMs % tickMs)// Try to advance the clock of the overflow wheel if present//驅動上層時間輪,這里的傳給上層的currentTime時間是本層時間輪打平過的,但是在上層時間輪還是會繼續(xù)打平if (overflowWheel != null) overflowWheel.advanceClock(currentTime)}}
//循環(huán)bucket里面的任務列表,一個個重新添加進時間輪,對符合條件的時間輪進行升降級或者執(zhí)行任務private[this] val reinsert = (timerTaskEntry: TimerTaskEntry) => addTimerTaskEntry(timerTaskEntry)/** Advances the clock if there is an expired bucket. If there isn't any expired bucket when called,* waits up to timeoutMs before giving up.*/def advanceClock(timeoutMs: Long): Boolean = {var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)if (bucket != null) {writeLock.lock()try {while (bucket != null) {//驅動時間輪timingWheel.advanceClock(bucket.getExpiration())//循環(huán)buckek也就是任務列表,任務列表一個個繼續(xù)添加進時間輪以此來升級或者降級時間輪,把過期任務找出來執(zhí)行bucket.flush(reinsert)//循環(huán)//這里就是從延遲隊列取出bucket,bucket是有延遲時間的,取出代表該bucket過期,我們通過bucket能取到bucket包含的任務列表bucket = delayQueue.poll()}} finally {writeLock.unlock()}true} else {false}}
3.

kafka的延遲隊列使用時間輪實現(xiàn),能夠支持大量任務的高效觸發(fā),但是在kafka延遲隊列實現(xiàn)方案里還是看到了delayQueue的影子,使用delayQueue是對時間輪里面的bucket放入延遲隊列,以此來推動時間輪滾動,但是基于將插入和刪除操作則放入時間輪中,將這些操作的時間復雜度都降為O(1),提升效率。Kafka對性能的極致追求讓它把最合適的組件放在最適合的位置。
?

滴滴車險團隊架構師,負責車險核心系統(tǒng)的架構和設計,十年互聯(lián)網研發(fā)架構經驗,其中五年中間件與基礎架構經驗,對高并發(fā),高可用以及分布式應用的架構設計有豐富的實戰(zhàn)經驗,尤其對分布式消息隊列,分布式流程編排引擎、分布式數(shù)據(jù)庫中間件有較深入的研究,熱愛技術,崇尚開源,是Kafka、RocketMQ、Conductor等多個知名開源項目的源碼貢獻者。

團隊招聘
?
滴滴車險團隊基于滴滴近百萬輛車和海量數(shù)據(jù),通過線上化、科技化、數(shù)據(jù)化的手段,達到車險的降賠付、降發(fā)生,降保費,為乘客、司機、以及車隊、合作伙伴提供方便快捷高效的車險金融服務。
團隊長期招聘java高級工程師和技術專家,歡迎有興趣的小伙伴加入,可投遞簡歷至 [email protected],郵件請郵件主題請命名為「姓名-應聘部門-應聘方向」。
掃碼了解更多崗位
延伸閱讀




