深挖 Netty 源碼:時間輪底層原理分析
本文選自 Doocs 開源社區(qū)旗下“源碼獵人”項目,作者?tydhot[1]。
項目將會持續(xù)更新,歡迎 Star 關注。
項目地址:https://github.com/doocs/source-code-hunter
該文所涉及的 Netty 源碼版本為 4.1.6。
HashedWheelTimer 是什么
Netty 的時間輪?HashedWheelTimer?給出了一個粗略的定時器實現(xiàn),之所以稱之為粗略的實現(xiàn)是因為該時間輪并沒有嚴格的準時執(zhí)行定時任務,而是在每隔一個時間間隔之后的時間節(jié)點執(zhí)行,并執(zhí)行當前時間節(jié)點之前到期的定時任務。
當然具體的定時任務的時間執(zhí)行精度可以通過調節(jié) HashedWheelTimer 構造方法的時間間隔的大小來進行調節(jié),在大多數(shù)網絡應用的情況下,由于 IO 延遲的存在,并不會嚴格要求具體的時間執(zhí)行精度,所以默認的 100ms 時間間隔可以滿足大多數(shù)的情況,不需要再花精力去調節(jié)該時間精度。
HashedWheelTimer 的實現(xiàn)原理
HashedWheelTimer 內部的數(shù)據(jù)結構
private final HashedWheelBucket[] wheel;HashedWheelTimer 的主體數(shù)據(jù)結構 wheel 是一個由多個鏈表所組成的數(shù)組,默認情況下該數(shù)組的大小為 512。當定時任務準備加入到時間輪中的時候,將會以其等待執(zhí)行的時間為依據(jù)選擇該數(shù)組上的一個具體槽位上的鏈表加入。
private HashedWheelTimeout head;private HashedWheelTimeout tail;
在這個 wheel 數(shù)組中,每一個槽位都是一條由 HashedWheelTimeout 所組成的鏈表,其中鏈表中的每一個節(jié)點都是一個等待執(zhí)行的定時任務。
HashedWheelTimer 內部的線程模型
在 HashedWheelTimer 中,其內部是一個單線程的 worker 線程,通過類似 eventloop 的工作模式進行定時任務的調度。
@Overridepublic void run() {// Initialize the startTime.startTime = System.nanoTime();if (startTime == 0) {// We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.startTime = 1;}// Notify the other threads waiting for the initialization at start().startTimeInitialized.countDown();do {final long deadline = waitForNextTick();if (deadline > 0) {transferTimeoutsToBuckets();HashedWheelBucket bucket =wheel[(int) (tick & mask)];bucket.expireTimeouts(deadline);tick++;}} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);// Fill the unprocessedTimeouts so we can return them from stop() method.for (HashedWheelBucket bucket: wheel) {bucket.clearTimeouts(unprocessedTimeouts);}for (;;) {HashedWheelTimeout timeout = timeouts.poll();if (timeout == null) {break;}unprocessedTimeouts.add(timeout);}}
簡單看到 HashedWheelTimer 內部的 woker 線程的?run()方法,在其首先會記錄啟動時間作為 startTime 作為接下來調度定時任務的時間依據(jù),而之后會通過 CountDownLatch 來通知所有外部線程當前 worker 工作線程已經初始化完畢。之后的循環(huán)體便是當時間輪持續(xù)生效的時間里的具體調度邏輯。時間刻度是時間輪的一個重要屬性,其默認為 100ms,此處的循環(huán)間隔便是時間輪的時間刻度,默認情況下就是間隔 100ms 進行一次調度循環(huán)。工作線程會維護當前工作線程具體循環(huán)了多少輪,用于定位具體執(zhí)行觸發(fā)時間輪數(shù)組上的哪一個位置上的鏈表。當時間輪準備 shutdown 的階段,最后的代碼會對未執(zhí)行的任務整理到未執(zhí)行的隊列中。
由此可見,worker 線程的 run()方法中基本定義了工作線程的整個生命周期,從初始的初始化到循環(huán)體中的具體調度,最后到未執(zhí)行任務的具體清理。整體的調度邏輯便主要在這里執(zhí)行。值得注意的是,在這里的前提下,每個 HashedWheelTimer 時間輪都會有一個工作線程進行調度,所以不需要在 netty 中在每一個連接中單獨使用一個 HashedWheelTimer 來進行定時任務的調度,否則可能將對性能產生影響。
向 HashedWheelTimer 加入一個定時任務的流程
當調用 HashedWheelTimer 的 newTimeout()方法的時候,即是將定時任務加入時間輪中的 api。
@Overridepublic Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {if (task == null) {throw new NullPointerException("task");}if (unit == null) {throw new NullPointerException("unit");}start();long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);timeouts.add(timeout);return timeout;}
當此次是首次向該時間輪加入定時任務的時候,將會通過 start()方法開始執(zhí)行上文所述的 worker 工作線程的啟動與循環(huán)調度邏輯,這里暫且不提。之后計算定時任務觸發(fā)時間相對于時間輪初始化時間的相對時間間隔 deadline,并將其包裝為一個鏈表節(jié)點 HashedWheelTimeout ,投入到 timeouts 隊列中,等待 worker 工作線程在下一輪調度循環(huán)中將其加入到時間輪的具體鏈表中等待觸發(fā)執(zhí)行,timeouts 的實現(xiàn)是一個 mpsc 隊列,關于 mpsc 隊列可以查看此文,這里也符合多生產者單消費者的隊列模型。
HashedWheelTimer 中工作線程的具體調度
do {final long deadline = waitForNextTick();if (deadline > 0) {transferTimeoutsToBuckets();HashedWheelBucket bucket =wheel[(int) (tick & mask)];bucket.expireTimeouts(deadline);tick++;}} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
在 HashedWheelTimer 中的工作線程 run()方法的主要循環(huán)中,主要分為三個步驟。
首先 worker 線程會通過?waitForNextTick()方法根據(jù)時間輪的時間刻度等待一輪循環(huán)的開始,在默認情況下時間輪的時間刻度是 100ms,那么此處 worker 線程也將在這個方法中 sleep 相應的時間等待下一輪循環(huán)的開始。此處也決定了時間輪的定時任務時間精度。
當 worker 線程經過相應時間間隔的 sleep 之后,也代表新的一輪調度開始。此時,會通過?transferTimeoutsToBuckets()方法將之前剛剛加入到 timeouts 隊列中的定時任務放入到時間輪具體槽位上的鏈表中。
for (int i = 0; i < 100000; i++) {HashedWheelTimeout timeout = timeouts.poll();if (timeout == null) {// all processedbreak;}if (timeout.state() == HashedWheelTimeout.ST_CANCELLED|| !timeout.compareAndSetState(HashedWheelTimeout.ST_INIT, HashedWheelTimeout.ST_IN_BUCKET)) {timeout.remove();continue;}long calculated = timeout.deadline / tickDuration;long remainingRounds = (calculated - tick) / wheel.length;timeout.remainingRounds = remainingRounds;final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.int stopIndex = (int) (ticks & mask);HashedWheelBucket bucket = wheel[stopIndex];bucket.addTimeout(timeout);}
首先,在每一輪的調度中,最多只會從?timeouts?隊列中定位到時間輪 100000 個定時任務,這也是為了防止在這里耗時過久導致后面觸發(fā)定時任務的延遲。在這里會不斷從 timeouts 隊列中獲取剛加入的定時任務。
具體的計算流程便是將定時任務相對于時間輪初始化時間的相對間隔與時間輪的時間刻度相除得到相對于初始化時間的具體輪數(shù),之后便在減去當前輪數(shù)得到還需要遍歷幾遍整個時間輪數(shù)組得到 remainingRounds,最后將輪數(shù)與時間輪數(shù)組長度-1 相與,得到該定時任務到底應該存放到時間輪上哪個位置的鏈表。
用具體的數(shù)組舉個例子,該時間輪初始化時間為 12 點,時間刻度為 1 小時,時間輪數(shù)組長度為 8,當前時間 13 點,當向時間輪加入一個明天 13 點執(zhí)行的任務的時候,首先得到該任務相對于初始化的時間間隔是 25 小時,也就是需要 25 輪調度,而當前 13 點,當前調度輪數(shù)為 1,因此還需要 24 輪調度,就需要再遍歷 3 輪時間輪,因此 remainingRounds 為 3,再根據(jù) 25 與 8-1 相與的結果為 1,因此將該定時任務放置到時間輪數(shù)組下標為 1 的鏈表上等待被觸發(fā)。
這便是一次完整的定時任務加入到時間輪具體位置的計算。
在 worker 線程的最后,就需要來具體執(zhí)行定時任務了,首先通過當前循環(huán)輪數(shù)與時間輪數(shù)組長度-1 相與的結果定位具體觸發(fā)時間輪數(shù)組上哪個位置上的鏈表,再通過?expireTimeouts()方法依次對鏈表上的定時任務進行觸發(fā)執(zhí)行。這里的流程就相對很簡單,鏈表上的節(jié)點如果 remainingRounds 小于等于 0,那么就可以直接執(zhí)行這個定時任務,如果 remainingRounds 大于 0,那么顯然還沒有到達觸發(fā)的時間點,則將其-1 等待下一輪的調度之后再進行執(zhí)行。在繼續(xù)回到上面的例子,當 14 點來臨之時,此時工作線程將進行第 2 輪的調度,將會把 2 與 8-1 進行相與得到結果 2,那么當前工作線程就會選擇時間輪數(shù)組下標為 2 的鏈表依次判斷是否需要觸發(fā),如果 remainingRounds 為 0 將會直接觸發(fā),否則將會將 remainingRounds-1 等待下一輪的執(zhí)行。
全文完!
希望本文對大家有所幫助。如果感覺本文有幫助,有勞轉發(fā)或點一下“在看”!讓更多人收獲知識!
引用鏈接
[1]?tydhot:?https://github.com/tydhot
長按識別下圖二維碼,關注公眾號「Doocs 開源社區(qū)」,第一時間跟你們分享好玩、實用的技術文章與業(yè)內最新資訊。
