1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        萬字詳解本地緩存之王 Caffeine

        共 12375字,需瀏覽 25分鐘

         ·

        2020-12-23 10:19



        Alben|https://albenw.github.io/posts/a4ae1aa2/

        排版公眾號:JavaGuide

        概要

        Caffeine[1]是一個高性能,高命中率,低內(nèi)存占用,near optimal 的本地緩存,簡單來說它是 Guava Cache 的優(yōu)化加強(qiáng)版,有些文章把 Caffeine 稱為“新一代的緩存”、“現(xiàn)代緩存之王”。

        本文將重點講解 Caffeine 的高性能設(shè)計,以及對應(yīng)部分的源碼分析。

        與 Guava Cache 比較

        如果你對 Guava Cache 還不理解的話,可以點擊這里[2]來看一下我之前寫過關(guān)于 Guava Cache 的文章。

        大家都知道,Spring5 即將放棄掉 Guava Cache 作為緩存機(jī)制,而改用 Caffeine 作為新的本地 Cache 的組件,這對于 Caffeine 來說是一個很大的肯定。為什么 Spring 會這樣做呢?其實在 Caffeine 的Benchmarks[3]里給出了好靚仔的數(shù)據(jù),對讀和寫的場景,還有跟其他幾個緩存工具進(jìn)行了比較,Caffeine 的性能都表現(xiàn)很突出。

        使用 Caffeine

        Caffeine 為了方便大家使用以及從 Guava Cache 切換過來(很有針對性啊~),借鑒了 Guava Cache 大部分的概念(諸如核心概念Cache、LoadingCache、CacheLoader、CacheBuilder等等),對于 Caffeine 的理解只要把它當(dāng)作 Guava Cache 就可以了。

        使用上,大家只要把 Caffeine 的包引進(jìn)來,然后換一下 cache 的實現(xiàn)類,基本應(yīng)該就沒問題了。這對與已經(jīng)使用過 Guava Cache 的同學(xué)來說沒有任何難度,甚至還有一點熟悉的味道,如果你之前沒有使用過 Guava Cache,可以查看 Caffeine 的官方 API 說明文檔[4],其中PopulationEviction,RemovalRefresh,StatisticsCleanup,Policy等等這些特性都是跟 Guava Cache 基本一樣的。

        下面給出一個例子說明怎樣創(chuàng)建一個 Cache:

        private?static?LoadingCache?cache?=?Caffeine.newBuilder()
        ????????????//最大個數(shù)限制
        ????????????.maximumSize(256L)
        ????????????//初始化容量
        ????????????.initialCapacity(1)
        ????????????//訪問后過期(包括讀和寫)
        ????????????.expireAfterAccess(2,?TimeUnit.DAYS)
        ????????????//寫后過期
        ????????????.expireAfterWrite(2,?TimeUnit.HOURS)
        ????????????//寫后自動異步刷新
        ????????????.refreshAfterWrite(1,?TimeUnit.HOURS)
        ????????????//記錄下緩存的一些統(tǒng)計數(shù)據(jù),例如命中率等
        ????????????.recordStats()
        ????????????//cache對緩存寫的通知回調(diào)
        ????????????.writer(new?CacheWriter()?{
        ????????????????@Override
        ????????????????public?void?write(@NonNull?Object?key,?@NonNull?Object?value)?{
        ????????????????????log.info("key={},?CacheWriter?write",?key);
        ????????????????}

        ????????????????@Override
        ????????????????public?void?delete(@NonNull?Object?key,?@Nullable?Object?value,?@NonNull?RemovalCause?cause)?{
        ????????????????????log.info("key={},?cause={},?CacheWriter?delete",?key,?cause);
        ????????????????}
        ????????????})
        ????????????//使用CacheLoader創(chuàng)建一個LoadingCache
        ????????????.build(new?CacheLoader()?{
        ????????????????//同步加載數(shù)據(jù)
        ????????????????@Nullable
        ????????????????@Override
        ????????????????public?String?load(@NonNull?String?key)?throws?Exception?{
        ????????????????????return?"value_"?+?key;
        ????????????????}

        ????????????????//異步加載數(shù)據(jù)
        ????????????????@Nullable
        ????????????????@Override
        ????????????????public?String?reload(@NonNull?String?key,?@NonNull?String?oldValue)?throws?Exception?{
        ????????????????????return?"value_"?+?key;
        ????????????????}
        ????????????});

        更多從 Guava Cache 遷移過來的使用說明,請看這里[5]

        Caffeine 的高性能設(shè)計

        判斷一個緩存的好壞最核心的指標(biāo)就是命中率,影響緩存命中率有很多因素,包括業(yè)務(wù)場景、淘汰策略、清理策略、緩存容量等等。如果作為本地緩存, 它的性能的情況,資源的占用也都是一個很重要的指標(biāo)。下面

        我們來看看 Caffeine 在這幾個方面是怎么著手的,如何做優(yōu)化的。

        (注:本文不會分析 Caffeine 全部源碼,只會對核心設(shè)計的實現(xiàn)進(jìn)行分析,但我建議讀者把 Caffeine 的源碼都涉獵一下,有個 overview 才能更好理解本文。如果你看過 Guava Cache 的源碼也行,代碼的數(shù)據(jù)結(jié)構(gòu)和處理邏輯很類似的。

        源碼基于:caffeine-2.8.0.jar)

        W-TinyLFU 整體設(shè)計

        上面說到淘汰策略是影響緩存命中率的因素之一,一般比較簡單的緩存就會直接用到 LFU(Least Frequently Used,即最不經(jīng)常使用) 或者LRU(Least Recently Used,即最近最少使用) ,而 Caffeine 就是使用了 W-TinyLFU 算法。

        W-TinyLFU 看名字就能大概猜出來,它是 LFU 的變種,也是一種緩存淘汰算法。那為什么要使用 W-TinyLFU 呢?

        LRU 和 LFU 的缺點

        • LRU 實現(xiàn)簡單,在一般情況下能夠表現(xiàn)出很好的命中率,是一個“性價比”很高的算法,平時也很常用。雖然 LRU 對突發(fā)性的稀疏流量(sparse bursts)表現(xiàn)很好,但同時也會產(chǎn)生緩存污染,舉例來說,如果偶然性的要對全量數(shù)據(jù)進(jìn)行遍歷,那么“歷史訪問記錄”就會被刷走,造成污染。
        • 如果數(shù)據(jù)的分布在一段時間內(nèi)是固定的話,那么 LFU 可以達(dá)到最高的命中率。但是 LFU 有兩個缺點,第一,它需要給每個記錄項維護(hù)頻率信息,每次訪問都需要更新,這是個巨大的開銷;第二,對突發(fā)性的稀疏流量無力,因為前期經(jīng)常訪問的記錄已經(jīng)占用了緩存,偶然的流量不太可能會被保留下來,而且過去的一些大量被訪問的記錄在將來也不一定會使用上,這樣就一直把“坑”占著了。

        無論 LRU 還是 LFU 都有其各自的缺點,不過,現(xiàn)在已經(jīng)有很多針對其缺點而改良、優(yōu)化出來的變種算法。

        TinyLFU

        TinyLFU 就是其中一個優(yōu)化算法,它是專門為了解決 LFU 上述提到的兩個問題而被設(shè)計出來的。

        解決第一個問題是采用了 Count–Min Sketch 算法。

        解決第二個問題是讓記錄盡量保持相對的“新鮮”(Freshness Mechanism),并且當(dāng)有新的記錄插入時,可以讓它跟老的記錄進(jìn)行“PK”,輸者就會被淘汰,這樣一些老的、不再需要的記錄就會被剔除。

        下圖是 TinyLFU 設(shè)計圖(來自官方)

        統(tǒng)計頻率 Count–Min Sketch 算法

        如何對一個 key 進(jìn)行統(tǒng)計,但又可以節(jié)省空間呢?(不是簡單的使用HashMap,這太消耗內(nèi)存了),注意哦,不需要精確的統(tǒng)計,只需要一個近似值就可以了,怎么樣,這樣場景是不是很熟悉,如果你是老司機(jī),或許已經(jīng)聯(lián)想到布隆過濾器(Bloom Filter)的應(yīng)用了。

        沒錯,將要介紹的 Count–Min Sketch 的原理跟 Bloom Filter 一樣,只不過 Bloom Filter 只有 0 和 1 的值,那么你可以把 Count–Min Sketch 看作是“數(shù)值”版的 Bloom Filter。

        更多關(guān)于 Count–Min Sketch 的介紹請自行搜索。

        在 TinyLFU 中,近似頻率的統(tǒng)計如下圖所示:

        對一個 key 進(jìn)行多次 hash 函數(shù)后,index 到多個數(shù)組位置后進(jìn)行累加,查詢時取多個值中的最小值即可。

        Caffeine 對這個算法的實現(xiàn)在FrequencySketch類。但 Caffeine 對此有進(jìn)一步的優(yōu)化,例如 Count–Min Sketch 使用了二維數(shù)組,Caffeine 只是用了一個一維的數(shù)組;再者,如果是數(shù)值類型的話,這個數(shù)需要用 int 或 long 來存儲,但是 Caffeine 認(rèn)為緩存的訪問頻率不需要用到那么大,只需要 15 就足夠,一般認(rèn)為達(dá)到 15 次的頻率算是很高的了,而且 Caffeine 還有另外一個機(jī)制來使得這個頻率進(jìn)行衰退減半(下面就會講到)。如果最大是 15 的話,那么只需要 4 個 bit 就可以滿足了,一個 long 有 64bit,可以存儲 16 個這樣的統(tǒng)計數(shù),Caffeine 就是這樣的設(shè)計,使得存儲效率提高了 16 倍。

        Caffeine 對緩存的讀寫(afterReadafterWrite方法)都會調(diào)用onAccesss 方法,而onAccess方法里有一句:

        frequencySketch().increment(key);

        這句就是追加記錄的頻率,下面我們看看具體實現(xiàn)

        //FrequencySketch的一些屬性

        //種子數(shù)
        static?final?long[]?SEED?=?{?//?A?mixture?of?seeds?from?FNV-1a,?CityHash,?and?Murmur3
        ????0xc3a5c85c97cb3127L,?0xb492b66fbe98f273L,?0x9ae16a3b2f90404fL,?0xcbf29ce484222325L};
        static?final?long?RESET_MASK?=?0x7777777777777777L;
        static?final?long?ONE_MASK?=?0x1111111111111111L;

        int?sampleSize;
        //為了快速根據(jù)hash值得到table的index值的掩碼
        //table的長度size一般為2的n次方,而tableMask為size-1,這樣就可以通過&操作來模擬取余操作,速度快很多,老司機(jī)都知道
        int?tableMask;
        //存儲數(shù)據(jù)的一維long數(shù)組
        long[]?table;
        int?size;

        /**
        ?*?Increments?the?popularity?of?the?element?if?it?does?not?exceed?the?maximum?(15).?The?popularity
        ?*?of?all?elements?will?be?periodically?down?sampled?when?the?observed?events?exceeds?a?threshold.
        ?*?This?process?provides?a?frequency?aging?to?allow?expired?long?term?entries?to?fade?away.
        ?*
        ?*?@param?e?the?element?to?add
        ?*/

        public?void?increment(@NonNull?E?e)?{
        ??if?(isNotInitialized())?{
        ????return;
        ??}

        ??//根據(jù)key的hashCode通過一個哈希函數(shù)得到一個hash值
        ??//本來就是hashCode了,為什么還要再做一次hash?怕原來的hashCode不夠均勻分散,再打散一下。
        ??int?hash?=?spread(e.hashCode());
        ??//這句光看有點難理解
        ??//就如我剛才說的,Caffeine把一個long的64bit劃分成16個等分,每一等分4個bit。
        ??//這個start就是用來定位到是哪一個等分的,用hash值低兩位作為隨機(jī)數(shù),再左移2位,得到一個小于16的值
        ??int?start?=?(hash?&?3)?<2;

        ??//indexOf方法的意思就是,根據(jù)hash值和不同種子得到table的下標(biāo)index
        ??//這里通過四個不同的種子,得到四個不同的下標(biāo)index
        ??int?index0?=?indexOf(hash,?0);
        ??int?index1?=?indexOf(hash,?1);
        ??int?index2?=?indexOf(hash,?2);
        ??int?index3?=?indexOf(hash,?3);

        ??//根據(jù)index和start(+1,?+2,?+3)的值,把table[index]對應(yīng)的等分追加1
        ??//這個incrementAt方法有點難理解,看我下面的解釋
        ??boolean?added?=?incrementAt(index0,?start);
        ??added?|=?incrementAt(index1,?start?+?1);
        ??added?|=?incrementAt(index2,?start?+?2);
        ??added?|=?incrementAt(index3,?start?+?3);

        ??//這個reset等下說
        ??if?(added?&&?(++size?==?sampleSize))?{
        ????reset();
        ??}
        }

        /**
        ?*?Increments?the?specified?counter?by?1?if?it?is?not?already?at?the?maximum?value?(15).
        ?*
        ?*?@param?i?the?table?index?(16?counters)
        ?*?@param?j?the?counter?to?increment
        ?*?@return?if?incremented
        ?*/

        boolean?incrementAt(int?i,?int?j)?{
        ??//這個j表示16個等分的下標(biāo),那么offset就是相當(dāng)于在64位中的下標(biāo)(這個自己想想)
        ??int?offset?=?j?<2;
        ??//上面提到Caffeine把頻率統(tǒng)計最大定為15,即0xfL
        ??//mask就是在64位中的掩碼,即1111后面跟很多個0
        ??long?mask?=?(0xfL?<??//如果&的結(jié)果不等于15,那么就追加1。等于15就不會再加了
        ??if?((table[i]?&?mask)?!=?mask)?{
        ????table[i]?+=?(1L?<????return?true;
        ??}
        ??return?false;
        }

        /**
        ?*?Returns?the?table?index?for?the?counter?at?the?specified?depth.
        ?*
        ?*?@param?item?the?element's?hash
        ?*?@param?i?the?counter?depth
        ?*?@return?the?table?index
        ?*/

        int?indexOf(int?item,?int?i)?{
        ??long?hash?=?SEED[i]?*?item;
        ??hash?+=?hash?>>>?32;
        ??return?((int)?hash)?&?tableMask;
        }

        /**
        ?*?Applies?a?supplemental?hash?function?to?a?given?hashCode,?which?defends?against?poor?quality
        ?*?hash?functions.
        ?*/

        int?spread(int?x)?{
        ??x?=?((x?>>>?16)?^?x)?*?0x45d9f3b;
        ??x?=?((x?>>>?16)?^?x)?*?0x45d9f3b;
        ??return?(x?>>>?16)?^?x;
        }

        知道了追加方法,那么讀取方法frequency就很容易理解了。

        /**
        ?*?Returns?the?estimated?number?of?occurrences?of?an?element,?up?to?the?maximum?(15).
        ?*
        ?*?@param?e?the?element?to?count?occurrences?of
        ?*?@return?the?estimated?number?of?occurrences?of?the?element;?possibly?zero?but?never?negative
        ?*/

        @NonNegative
        public?int?frequency(@NonNull?E?e)?{
        ??if?(isNotInitialized())?{
        ????return?0;
        ??}

        ??//得到hash值,跟上面一樣
        ??int?hash?=?spread(e.hashCode());
        ??//得到等分的下標(biāo),跟上面一樣
        ??int?start?=?(hash?&?3)?<2;
        ??int?frequency?=?Integer.MAX_VALUE;
        ??//循環(huán)四次,分別獲取在table數(shù)組中不同的下標(biāo)位置
        ??for?(int?i?=?0;?i?4;?i++)?{
        ????int?index?=?indexOf(hash,?i);
        ????//這個操作就不多說了,其實跟上面incrementAt是一樣的,定位到table[index]?+?等分的位置,再根據(jù)mask取出計數(shù)值
        ????int?count?=?(int)?((table[index]?>>>?((start?+?i)?<2))?&?0xfL);
        ????//取四個中的較小值
        ????frequency?=?Math.min(frequency,?count);
        ??}
        ??return?frequency;
        }

        通過代碼和注釋或者讀者可能難以理解,下圖是我畫出來幫助大家理解的結(jié)構(gòu)圖。

        注意紫色虛線框,其中藍(lán)色小格就是需要計算的位置:


        保新機(jī)制

        為了讓緩存保持“新鮮”,剔除掉過往頻率很高但之后不經(jīng)常的緩存,Caffeine 有一個 Freshness Mechanism。做法很簡答,就是當(dāng)整體的統(tǒng)計計數(shù)(當(dāng)前所有記錄的頻率統(tǒng)計之和,這個數(shù)值內(nèi)部維護(hù))達(dá)到某一個值時,那么所有記錄的頻率統(tǒng)計除以 2。

        從上面的代碼

        //size變量就是所有記錄的頻率統(tǒng)計之,即每個記錄加1,這個size都會加1
        //sampleSize一個閾值,從FrequencySketch初始化可以看到它的值為maximumSize的10倍
        if?(added?&&?(++size?==?sampleSize))?{
        ??????reset();
        }

        看到reset方法就是做這個事情

        /**?Reduces?every?counter?by?half?of?its?original?value.?*/
        void?reset()?{
        ??int?count?=?0;
        ??for?(int?i?=?0;?i?????count?+=?Long.bitCount(table[i]?&?ONE_MASK);
        ????table[i]?=?(table[i]?>>>?1)?&?RESET_MASK;
        ??}
        ??size?=?(size?>>>?1)?-?(count?>>>?2);
        }

        關(guān)于這個 reset 方法,為什么是除以 2,而不是其他,及其正確性,在最下面的參考資料的 TinyLFU 論文中 3.3 章節(jié)給出了數(shù)學(xué)證明,大家有興趣可以看看。

        增加一個 Window?

        Caffeine 通過測試發(fā)現(xiàn) TinyLFU 在面對突發(fā)性的稀疏流量(sparse bursts)時表現(xiàn)很差,因為新的記錄(new items)還沒來得及建立足夠的頻率就被剔除出去了,這就使得命中率下降。

        于是 Caffeine 設(shè)計出一種新的 policy,即 Window Tiny LFU(W-TinyLFU),并通過實驗和實踐發(fā)現(xiàn) W-TinyLFU 比 TinyLFU 表現(xiàn)的更好。

        W-TinyLFU 的設(shè)計如下所示(兩圖等價):

        它主要包括兩個緩存模塊,主緩存是 SLRU(Segmented LRU,即分段 LRU),SLRU 包括一個名為 protected 和一個名為 probation 的緩存區(qū)。通過增加一個緩存區(qū)(即 Window Cache),當(dāng)有新的記錄插入時,會先在 window 區(qū)呆一下,就可以避免上述說的 sparse bursts 問題。

        淘汰策略(eviction policy)

        當(dāng) window 區(qū)滿了,就會根據(jù) LRU 把 candidate(即淘汰出來的元素)放到 probation 區(qū),如果 probation 區(qū)也滿了,就把 candidate 和 probation 將要淘汰的元素 victim,兩個進(jìn)行“PK”,勝者留在 probation,輸者就要被淘汰了。

        而且經(jīng)過實驗發(fā)現(xiàn)當(dāng) window 區(qū)配置為總?cè)萘康?1%,剩余的 99%當(dāng)中的 80%分給 protected 區(qū),20%分給 probation 區(qū)時,這時整體性能和命中率表現(xiàn)得最好,所以 Caffeine 默認(rèn)的比例設(shè)置就是這個。

        不過這個比例 Caffeine 會在運行時根據(jù)統(tǒng)計數(shù)據(jù)(statistics)去動態(tài)調(diào)整,如果你的應(yīng)用程序的緩存隨著時間變化比較快的話,那么增加 window 區(qū)的比例可以提高命中率,相反緩存都是比較固定不變的話,增加 Main Cache 區(qū)(protected 區(qū) +probation 區(qū))的比例會有較好的效果。

        下面我們看看上面說到的淘汰策略是怎么實現(xiàn)的:

        一般緩存對讀寫操作后都有后續(xù)的一系列“維護(hù)”操作,Caffeine 也不例外,這些操作都在maintenance方法,我們將要說到的淘汰策略也在里面。

        這方法比較重要,下面也會提到,所以這里只先說跟“淘汰策略”有關(guān)的evictEntriesclimb。

        /**
        ???*?Performs?the?pending?maintenance?work?and?sets?the?state?flags?during?processing?to?avoid
        ???*?excess?scheduling?attempts.?The?read?buffer,?write?buffer,?and?reference?queues?are
        ???*?drained,?followed?by?expiration,?and?size-based?eviction.
        ???*
        ???*?@param?task?an?additional?pending?task?to?run,?or?{@code?null}?if?not?present
        ???*/

        ??@GuardedBy("evictionLock")
        ??void?maintenance(@Nullable?Runnable?task)?{
        ????lazySetDrainStatus(PROCESSING_TO_IDLE);

        ????try?{
        ??????drainReadBuffer();

        ??????drainWriteBuffer();
        ??????if?(task?!=?null)?{
        ????????task.run();
        ??????}

        ??????drainKeyReferences();
        ??????drainValueReferences();

        ??????expireEntries();
        ??????//把符合條件的記錄淘汰掉
        ??????evictEntries();
        ??????//動態(tài)調(diào)整window區(qū)和protected區(qū)的大小
        ??????climb();
        ????}?finally?{
        ??????if?((drainStatus()?!=?PROCESSING_TO_IDLE)?||?!casDrainStatus(PROCESSING_TO_IDLE,?IDLE))?{
        ????????lazySetDrainStatus(REQUIRED);
        ??????}
        ????}
        ??}
        先說一下 Caffeine 對上面說到的 W-TinyLFU 策略的實現(xiàn)用到的數(shù)據(jù)結(jié)構(gòu):
        //最大的個數(shù)限制
        long?maximum;
        //當(dāng)前的個數(shù)
        long?weightedSize;
        //window區(qū)的最大限制
        long?windowMaximum;
        //window區(qū)當(dāng)前的個數(shù)
        long?windowWeightedSize;
        //protected區(qū)的最大限制
        long?mainProtectedMaximum;
        //protected區(qū)當(dāng)前的個數(shù)
        long?mainProtectedWeightedSize;
        //下一次需要調(diào)整的大?。ㄟ€需要進(jìn)一步計算)
        double?stepSize;
        //window區(qū)需要調(diào)整的大小
        long?adjustment;
        //命中計數(shù)
        int?hitsInSample;
        //不命中的計數(shù)
        int?missesInSample;
        //上一次的緩存命中率
        double?previousSampleHitRate;

        final?FrequencySketch?sketch;
        //window區(qū)的LRU?queue(FIFO)
        final?AccessOrderDeque>?accessOrderWindowDeque;
        //probation區(qū)的LRU?queue(FIFO)
        final?AccessOrderDeque>?accessOrderProbationDeque;
        //protected區(qū)的LRU?queue(FIFO)
        final?AccessOrderDeque>?accessOrderProtectedDeque;

        以及默認(rèn)比例設(shè)置(意思看注釋)

        /**?The?initial?percent?of?the?maximum?weighted?capacity?dedicated?to?the?main?space.?*/
        static?final?double?PERCENT_MAIN?=?0.99d;
        /**?The?percent?of?the?maximum?weighted?capacity?dedicated?to?the?main's?protected?space.?*/
        static?final?double?PERCENT_MAIN_PROTECTED?=?0.80d;
        /**?The?difference?in?hit?rates?that?restarts?the?climber.?*/
        static?final?double?HILL_CLIMBER_RESTART_THRESHOLD?=?0.05d;
        /**?The?percent?of?the?total?size?to?adapt?the?window?by.?*/
        static?final?double?HILL_CLIMBER_STEP_PERCENT?=?0.0625d;
        /**?The?rate?to?decrease?the?step?size?to?adapt?by.?*/
        static?final?double?HILL_CLIMBER_STEP_DECAY_RATE?=?0.98d;
        /**?The?maximum?number?of?entries?that?can?be?transfered?between?queues.?*/

        重點來了,evictEntriesclimb方法:

        /**?Evicts?entries?if?the?cache?exceeds?the?maximum.?*/
        @GuardedBy("evictionLock")
        void?evictEntries()?{
        ??if?(!evicts())?{
        ????return;
        ??}
        ??//淘汰window區(qū)的記錄
        ??int?candidates?=?evictFromWindow();
        ??//淘汰Main區(qū)的記錄
        ??evictFromMain(candidates);
        }

        /**
        ?*?Evicts?entries?from?the?window?space?into?the?main?space?while?the?window?size?exceeds?a
        ?*?maximum.
        ?*
        ?*?@return?the?number?of?candidate?entries?evicted?from?the?window?space
        ?*/

        //根據(jù)W-TinyLFU,新的數(shù)據(jù)都會無條件的加到admission?window
        //但是window是有大小限制,所以要“定期”做一下“維護(hù)”
        @GuardedBy("evictionLock")
        int?evictFromWindow()?{
        ??int?candidates?=?0;
        ??//查看window?queue的頭部節(jié)點
        ??Node?node?=?accessOrderWindowDeque().peek();
        ??//如果window區(qū)超過了最大的限制,那么就要把“多出來”的記錄做處理
        ??while?(windowWeightedSize()?>?windowMaximum())?{
        ????//?The?pending?operations?will?adjust?the?size?to?reflect?the?correct?weight
        ????if?(node?==?null)?{
        ??????break;
        ????}
        ????//下一個節(jié)點
        ????Node?next?=?node.getNextInAccessOrder();
        ????if?(node.getWeight()?!=?0)?{
        ??????//把node定位在probation區(qū)
        ??????node.makeMainProbation();
        ??????//從window區(qū)去掉
        ??????accessOrderWindowDeque().remove(node);
        ??????//加入到probation?queue,相當(dāng)于把節(jié)點移動到probation區(qū)(晉升了)
        ??????accessOrderProbationDeque().add(node);
        ??????candidates++;
        ??????//因為移除了一個節(jié)點,所以需要調(diào)整window的size
        ??????setWindowWeightedSize(windowWeightedSize()?-?node.getPolicyWeight());
        ????}
        ????//處理下一個節(jié)點
        ????node?=?next;
        ??}

        ??return?candidates;
        }

        evictFromMain方法:

        /**
        ?*?Evicts?entries?from?the?main?space?if?the?cache?exceeds?the?maximum?capacity.?The?main?space
        ?*?determines?whether?admitting?an?entry?(coming?from?the?window?space)?is?preferable?to?retaining
        ?*?the?eviction?policy's?victim.?This?is?decision?is?made?using?a?frequency?filter?so?that?the
        ?*?least?frequently?used?entry?is?removed.
        ?*
        ?*?The?window?space?candidates?were?previously?placed?in?the?MRU?position?and?the?eviction
        ?*?policy's?victim?is?at?the?LRU?position.?The?two?ends?of?the?queue?are?evaluated?while?an
        ?*?eviction?is?required.?The?number?of?remaining?candidates?is?provided?and?decremented?on
        ?*?eviction,?so?that?when?there?are?no?more?candidates?the?victim?is?evicted.
        ?*
        ?*?@param?candidates?the?number?of?candidate?entries?evicted?from?the?window?space
        ?*/

        //根據(jù)W-TinyLFU,從window晉升過來的要跟probation區(qū)的進(jìn)行“PK”,勝者才能留下
        @GuardedBy("evictionLock")
        void?evictFromMain(int?candidates)?{
        ??int?victimQueue?=?PROBATION;
        ??//victim是probation?queue的頭部
        ??Node?victim?=?accessOrderProbationDeque().peekFirst();
        ??//candidate是probation?queue的尾部,也就是剛從window晉升來的
        ??Node?candidate?=?accessOrderProbationDeque().peekLast();
        ??//當(dāng)cache不夠容量時才做處理
        ??while?(weightedSize()?>?maximum())?{
        ????//?Stop?trying?to?evict?candidates?and?always?prefer?the?victim
        ????if?(candidates?==?0)?{
        ??????candidate?=?null;
        ????}

        ????//對candidate為null且victim為bull的處理
        ????if?((candidate?==?null)?&&?(victim?==?null))?{
        ??????if?(victimQueue?==?PROBATION)?{
        ????????victim?=?accessOrderProtectedDeque().peekFirst();
        ????????victimQueue?=?PROTECTED;
        ????????continue;
        ??????}?else?if?(victimQueue?==?PROTECTED)?{
        ????????victim?=?accessOrderWindowDeque().peekFirst();
        ????????victimQueue?=?WINDOW;
        ????????continue;
        ??????}

        ??????//?The?pending?operations?will?adjust?the?size?to?reflect?the?correct?weight
        ??????break;
        ????}

        ????//對節(jié)點的weight為0的處理
        ????if?((victim?!=?null)?&&?(victim.getPolicyWeight()?==?0))?{
        ??????victim?=?victim.getNextInAccessOrder();
        ??????continue;
        ????}?else?if?((candidate?!=?null)?&&?(candidate.getPolicyWeight()?==?0))?{
        ??????candidate?=?candidate.getPreviousInAccessOrder();
        ??????candidates--;
        ??????continue;
        ????}

        ????//?Evict?immediately?if?only?one?of?the?entries?is?present
        ????if?(victim?==?null)?{
        ??????@SuppressWarnings("NullAway")
        ??????Node?previous?=?candidate.getPreviousInAccessOrder();
        ??????Node?evict?=?candidate;
        ??????candidate?=?previous;
        ??????candidates--;
        ??????evictEntry(evict,?RemovalCause.SIZE,?0L);
        ??????continue;
        ????}?else?if?(candidate?==?null)?{
        ??????Node?evict?=?victim;
        ??????victim?=?victim.getNextInAccessOrder();
        ??????evictEntry(evict,?RemovalCause.SIZE,?0L);
        ??????continue;
        ????}

        ????//?Evict?immediately?if?an?entry?was?collected
        ????K?victimKey?=?victim.getKey();
        ????K?candidateKey?=?candidate.getKey();
        ????if?(victimKey?==?null)?{
        ??????@NonNull?Node?evict?=?victim;
        ??????victim?=?victim.getNextInAccessOrder();
        ??????evictEntry(evict,?RemovalCause.COLLECTED,?0L);
        ??????continue;
        ????}?else?if?(candidateKey?==?null)?{
        ??????candidates--;
        ??????@NonNull?Node?evict?=?candidate;
        ??????candidate?=?candidate.getPreviousInAccessOrder();
        ??????evictEntry(evict,?RemovalCause.COLLECTED,?0L);
        ??????continue;
        ????}

        ????//放不下的節(jié)點直接處理掉
        ????if?(candidate.getPolicyWeight()?>?maximum())?{
        ??????candidates--;
        ??????Node?evict?=?candidate;
        ??????candidate?=?candidate.getPreviousInAccessOrder();
        ??????evictEntry(evict,?RemovalCause.SIZE,?0L);
        ??????continue;
        ????}

        ????//根據(jù)節(jié)點的統(tǒng)計頻率frequency來做比較,看看要處理掉victim還是candidate
        ????//admit是具體的比較規(guī)則,看下面
        ????candidates--;
        ????//如果candidate勝出則淘汰victim
        ????if?(admit(candidateKey,?victimKey))?{
        ??????Node?evict?=?victim;
        ??????victim?=?victim.getNextInAccessOrder();
        ??????evictEntry(evict,?RemovalCause.SIZE,?0L);
        ??????candidate?=?candidate.getPreviousInAccessOrder();
        ????}?else?{
        ??????//如果是victim勝出,則淘汰candidate
        ??????Node?evict?=?candidate;
        ??????candidate?=?candidate.getPreviousInAccessOrder();
        ??????evictEntry(evict,?RemovalCause.SIZE,?0L);
        ????}
        ??}
        }

        /**
        ?*?Determines?if?the?candidate?should?be?accepted?into?the?main?space,?as?determined?by?its
        ?*?frequency?relative?to?the?victim.?A?small?amount?of?randomness?is?used?to?protect?against?hash
        ?*?collision?attacks,?where?the?victim's?frequency?is?artificially?raised?so?that?no?new?entries
        ?*?are?admitted.
        ?*
        ?*?@param?candidateKey?the?key?for?the?entry?being?proposed?for?long?term?retention
        ?*?@param?victimKey?the?key?for?the?entry?chosen?by?the?eviction?policy?for?replacement
        ?*?@return?if?the?candidate?should?be?admitted?and?the?victim?ejected
        ?*/

        @GuardedBy("evictionLock")
        boolean?admit(K?candidateKey,?K?victimKey)?{
        ??//分別獲取victim和candidate的統(tǒng)計頻率
        ??//frequency這個方法的原理和實現(xiàn)上面已經(jīng)解釋了
        ??int?victimFreq?=?frequencySketch().frequency(victimKey);
        ??int?candidateFreq?=?frequencySketch().frequency(candidateKey);
        ??//誰大誰贏
        ??if?(candidateFreq?>?victimFreq)?{
        ????return?true;

        ????//如果相等,candidate小于5都當(dāng)輸了
        ??}?else?if?(candidateFreq?<=?5)?{
        ????//?The?maximum?frequency?is?15?and?halved?to?7?after?a?reset?to?age?the?history.?An?attack
        ????//?exploits?that?a?hot?candidate?is?rejected?in?favor?of?a?hot?victim.?The?threshold?of?a?warm
        ????//?candidate?reduces?the?number?of?random?acceptances?to?minimize?the?impact?on?the?hit?rate.
        ????return?false;
        ??}
        ??//如果相等且candidate大于5,則隨機(jī)淘汰一個
        ??int?random?=?ThreadLocalRandom.current().nextInt();
        ??return?((random?&?127)?==?0);
        }

        climb方法主要是用來調(diào)整 window size 的,使得 Caffeine 可以適應(yīng)你的應(yīng)用類型(如 OLAP 或 OLTP)表現(xiàn)出最佳的命中率。

        下圖是官方測試的數(shù)據(jù):

        我們看看 window size 的調(diào)整是怎么實現(xiàn)的。

        調(diào)整時用到的默認(rèn)比例數(shù)據(jù):

        //與上次命中率之差的閾值
        static?final?double?HILL_CLIMBER_RESTART_THRESHOLD?=?0.05d;
        //步長(調(diào)整)的大?。ǜ畲笾祄aximum的比例)
        static?final?double?HILL_CLIMBER_STEP_PERCENT?=?0.0625d;
        //步長的衰減比例
        static?final?double?HILL_CLIMBER_STEP_DECAY_RATE?=?0.98d;
        ??/**?Adapts?the?eviction?policy?to?towards?the?optimal?recency?/?frequency?configuration.?*/
        //climb方法的主要作用就是動態(tài)調(diào)整window區(qū)的大?。ㄏ鄳?yīng)的,main區(qū)的大小也會發(fā)生變化,兩個之和為100%)。
        //因為區(qū)域的大小發(fā)生了變化,那么區(qū)域內(nèi)的數(shù)據(jù)也可能需要發(fā)生相應(yīng)的移動。
        @GuardedBy("evictionLock")
        void?climb()?{
        ??if?(!evicts())?{
        ????return;
        ??}
        ??//確定window需要調(diào)整的大小
        ??determineAdjustment();
        ??//如果protected區(qū)有溢出,把溢出部分移動到probation區(qū)。因為下面的操作有可能需要調(diào)整到protected區(qū)。
        ??demoteFromMainProtected();
        ??long?amount?=?adjustment();
        ??if?(amount?==?0)?{
        ????return;
        ??}?else?if?(amount?>?0)?{
        ????//增加window的大小
        ????increaseWindow();
        ??}?else?{
        ????//減少window的大小
        ????decreaseWindow();
        ??}
        }

        下面分別展開每個方法來解釋:

        /**?Calculates?the?amount?to?adapt?the?window?by?and?sets?{@link?#adjustment()}?accordingly.?*/
        @GuardedBy("evictionLock")
        void?determineAdjustment()?{
        ??//如果frequencySketch還沒初始化,則返回
        ??if?(frequencySketch().isNotInitialized())?{
        ????setPreviousSampleHitRate(0.0);
        ????setMissesInSample(0);
        ????setHitsInSample(0);
        ????return;
        ??}
        ??//總請求量?=?命中?+?miss
        ??int?requestCount?=?hitsInSample()?+?missesInSample();
        ??//沒達(dá)到sampleSize則返回
        ??//默認(rèn)下sampleSize = 10?* maximum。用sampleSize來判斷緩存是否足夠”熱“。
        ??if?(requestCount?????return;
        ??}

        ??//命中率的公式?=?命中?/?總請求
        ??double?hitRate?=?(double)?hitsInSample()?/?requestCount;
        ??//命中率的差值
        ??double?hitRateChange?=?hitRate?-?previousSampleHitRate();
        ??//本次調(diào)整的大小,是由命中率的差值和上次的stepSize決定的
        ??double?amount?=?(hitRateChange?>=?0)???stepSize()?:?-stepSize();
        ??//下次的調(diào)整大?。喝绻新实闹畲笥?.05,則重置為0.065 * maximum,否則按照0.98來進(jìn)行衰減
        ??double?nextStepSize?=?(Math.abs(hitRateChange)?>=?HILL_CLIMBER_RESTART_THRESHOLD)
        ????????HILL_CLIMBER_STEP_PERCENT?*?maximum()?*?(amount?>=?0???1?:?-1)
        ??????:?HILL_CLIMBER_STEP_DECAY_RATE?*?amount;
        ??setPreviousSampleHitRate(hitRate);
        ??setAdjustment((long)?amount);
        ??setStepSize(nextStepSize);
        ??setMissesInSample(0);
        ??setHitsInSample(0);
        }

        /**?Transfers?the?nodes?from?the?protected?to?the?probation?region?if?it?exceeds?the?maximum.?*/

        //這個方法比較簡單,減少protected區(qū)溢出的部分
        @GuardedBy("evictionLock")
        void?demoteFromMainProtected()?{
        ??long?mainProtectedMaximum?=?mainProtectedMaximum();
        ??long?mainProtectedWeightedSize?=?mainProtectedWeightedSize();
        ??if?(mainProtectedWeightedSize?<=?mainProtectedMaximum)?{
        ????return;
        ??}

        ??for?(int?i?=?0;?i?????if?(mainProtectedWeightedSize?<=?mainProtectedMaximum)?{
        ??????break;
        ????}

        ????Node?demoted?=?accessOrderProtectedDeque().poll();
        ????if?(demoted?==?null)?{
        ??????break;
        ????}
        ????demoted.makeMainProbation();
        ????accessOrderProbationDeque().add(demoted);
        ????mainProtectedWeightedSize?-=?demoted.getPolicyWeight();
        ??}
        ??setMainProtectedWeightedSize(mainProtectedWeightedSize);
        }

        /**
        ?*?Increases?the?size?of?the?admission?window?by?shrinking?the?portion?allocated?to?the?main
        ?*?space.?As?the?main?space?is?partitioned?into?probation?and?protected?regions?(80%?/?20%),?for
        ?*?simplicity?only?the?protected?is?reduced.?If?the?regions?exceed?their?maximums,?this?may?cause
        ?*?protected?items?to?be?demoted?to?the?probation?region?and?probation?items?to?be?demoted?to?the
        ?*?admission?window.
        ?*/


        //增加window區(qū)的大小,這個方法比較簡單,思路就像我上面說的
        @GuardedBy("evictionLock")
        void?increaseWindow()?{
        ??if?(mainProtectedMaximum()?==?0)?{
        ????return;
        ??}

        ??long?quota?=?Math.min(adjustment(),?mainProtectedMaximum());
        ??setMainProtectedMaximum(mainProtectedMaximum()?-?quota);
        ??setWindowMaximum(windowMaximum()?+?quota);
        ??demoteFromMainProtected();

        ??for?(int?i?=?0;?i?????Node?candidate?=?accessOrderProbationDeque().peek();
        ????boolean?probation?=?true;
        ????if?((candidate?==?null)?||?(quota???????candidate?=?accessOrderProtectedDeque().peek();
        ??????probation?=?false;
        ????}
        ????if?(candidate?==?null)?{
        ??????break;
        ????}

        ????int?weight?=?candidate.getPolicyWeight();
        ????if?(quota???????break;
        ????}

        ????quota?-=?weight;
        ????if?(probation)?{
        ??????accessOrderProbationDeque().remove(candidate);
        ????}?else?{
        ??????setMainProtectedWeightedSize(mainProtectedWeightedSize()?-?weight);
        ??????accessOrderProtectedDeque().remove(candidate);
        ????}
        ????setWindowWeightedSize(windowWeightedSize()?+?weight);
        ????accessOrderWindowDeque().add(candidate);
        ????candidate.makeWindow();
        ??}

        ??setMainProtectedMaximum(mainProtectedMaximum()?+?quota);
        ??setWindowMaximum(windowMaximum()?-?quota);
        ??setAdjustment(quota);
        }

        /**?Decreases?the?size?of?the?admission?window?and?increases?the?main's?protected?region.?*/
        //同上increaseWindow差不多,反操作
        @GuardedBy("evictionLock")
        void?decreaseWindow()?{
        ??if?(windowMaximum()?<=?1)?{
        ????return;
        ??}

        ??long?quota?=?Math.min(-adjustment(),?Math.max(0,?windowMaximum()?-?1));
        ??setMainProtectedMaximum(mainProtectedMaximum()?+?quota);
        ??setWindowMaximum(windowMaximum()?-?quota);

        ??for?(int?i?=?0;?i?????Node?candidate?=?accessOrderWindowDeque().peek();
        ????if?(candidate?==?null)?{
        ??????break;
        ????}

        ????int?weight?=?candidate.getPolicyWeight();
        ????if?(quota???????break;
        ????}

        ????quota?-=?weight;
        ????setMainProtectedWeightedSize(mainProtectedWeightedSize()?+?weight);
        ????setWindowWeightedSize(windowWeightedSize()?-?weight);
        ????accessOrderWindowDeque().remove(candidate);
        ????accessOrderProbationDeque().add(candidate);
        ????candidate.makeMainProbation();
        ??}

        ??setMainProtectedMaximum(mainProtectedMaximum()?-?quota);
        ??setWindowMaximum(windowMaximum()?+?quota);
        ??setAdjustment(-quota);
        }

        以上,是 Caffeine 的 W-TinyLFU 策略的設(shè)計原理及代碼實現(xiàn)解析。

        異步的高性能讀寫

        一般的緩存每次對數(shù)據(jù)處理完之后(讀的話,已經(jīng)存在則直接返回,不存在則 load 數(shù)據(jù),保存,再返回;寫的話,則直接插入或更新),但是因為要維護(hù)一些淘汰策略,則需要一些額外的操作,諸如:

        • 計算和比較數(shù)據(jù)的是否過期
        • 統(tǒng)計頻率(像 LFU 或其變種)
        • 維護(hù) read queue 和 write queue
        • 淘汰符合條件的數(shù)據(jù)
        • 等等。。。

        這種數(shù)據(jù)的讀寫伴隨著緩存狀態(tài)的變更,Guava Cache 的做法是把這些操作和讀寫操作放在一起,在一個同步加鎖的操作中完成,雖然 Guava Cache 巧妙地利用了 JDK 的 ConcurrentHashMap(分段鎖或者無鎖 CAS)來降低鎖的密度,達(dá)到提高并發(fā)度的目的。但是,對于一些熱點數(shù)據(jù),這種做法還是避免不了頻繁的鎖競爭。Caffeine 借鑒了數(shù)據(jù)庫系統(tǒng)的 WAL(Write-Ahead Logging)思想,即先寫日志再執(zhí)行操作,這種思想同樣適合緩存的,執(zhí)行讀寫操作時,先把操作記錄在緩沖區(qū),然后在合適的時機(jī)異步、批量地執(zhí)行緩沖區(qū)中的內(nèi)容。但在執(zhí)行緩沖區(qū)的內(nèi)容時,也是需要在緩沖區(qū)加上同步鎖的,不然存在并發(fā)問題,只不過這樣就可以把對鎖的競爭從緩存數(shù)據(jù)轉(zhuǎn)移到對緩沖區(qū)上。

        ReadBuffer

        在 Caffeine 的內(nèi)部實現(xiàn)中,為了很好的支持不同的 Features(如 Eviction,Removal,Refresh,Statistics,Cleanup,Policy 等等),擴(kuò)展了很多子類,它們共同的父類是BoundedLocalCache,而readBuffer就是作為它們共有的屬性,即都是用一樣的 readBuffer,看定義:

        final?Buffer>?readBuffer;

        readBuffer?=?evicts()?||?collectKeys()?||?collectValues()?||?expiresAfterAccess()
        ??????????new?BoundedBuffer<>()
        ????????:?Buffer.disabled();

        上面提到 Caffeine 對每次緩存的讀操作都會觸發(fā)afterRead

        /**
        ?*?Performs?the?post-processing?work?required?after?a?read.
        ?*
        ?*?@param?node?the?entry?in?the?page?replacement?policy
        ?*?@param?now?the?current?time,?in?nanoseconds
        ?*?@param?recordHit?if?the?hit?count?should?be?incremented
        ?*/

        void?afterRead(Node?node,?long?now,?boolean?recordHit)?{
        ??if?(recordHit)?{
        ????statsCounter().recordHits(1);
        ??}
        ??//把記錄加入到readBuffer
        ??//判斷是否需要立即處理readBuffer
        ??//注意這里無論offer是否成功都可以走下去的,即允許寫入readBuffer丟失,因為這個
        ??boolean?delayable?=?skipReadBuffer()?||?(readBuffer.offer(node)?!=?Buffer.FULL);
        ??if?(shouldDrainBuffers(delayable))?{
        ????scheduleDrainBuffers();
        ??}
        ??refreshIfNeeded(node,?now);
        }

        ?/**
        ???*?Returns?whether?maintenance?work?is?needed.
        ???*
        ???*?@param?delayable?if?draining?the?read?buffer?can?be?delayed
        ???*/


        ??//caffeine用了一組狀態(tài)來定義和管理“維護(hù)”的過程
        ??boolean?shouldDrainBuffers(boolean?delayable)?{
        ????switch?(drainStatus())?{
        ??????case?IDLE:
        ????????return?!delayable;
        ??????case?REQUIRED:
        ????????return?true;
        ??????case?PROCESSING_TO_IDLE:
        ??????case?PROCESSING_TO_REQUIRED:
        ????????return?false;
        ??????default:
        ????????throw?new?IllegalStateException();
        ????}
        ??}

        重點看BoundedBuffer

        /**
        ?*?A?striped,?non-blocking,?bounded?buffer.
        ?*
        ?*?@author[email protected]?(Ben?Manes)
        ?*?@param??the?type?of?elements?maintained?by?this?buffer
        ?*/

        final?class?BoundedBuffer<E>?extends?StripedBuffer<E>

        它是一個 striped、非阻塞、有界限的 buffer,繼承于StripedBuffer類。下面看看StripedBuffer的實現(xiàn):

        /**
        ?*?A?base?class?providing?the?mechanics?for?supporting?dynamic?striping?of?bounded?buffers.?This
        ?*?implementation?is?an?adaption?of?the?numeric?64-bit?{@link?java.util.concurrent.atomic.Striped64}
        ?*?class,?which?is?used?by?atomic?counters.?The?approach?was?modified?to?lazily?grow?an?array?of
        ?*?buffers?in?order?to?minimize?memory?usage?for?caches?that?are?not?heavily?contended?on.
        ?*
        ?*?@author[email protected]?(Doug?Lea)
        ?*?@author[email protected]?(Ben?Manes)
        ?*/


        abstract?class?StripedBuffer<E>?implements?Buffer<E>

        這個StripedBuffer設(shè)計的思想是跟Striped64類似的,通過擴(kuò)展結(jié)構(gòu)把競爭熱點分離。

        具體實現(xiàn)是這樣的,StripedBuffer維護(hù)一個Buffer[]數(shù)組,每個元素就是一個RingBuffer,每個線程用自己threadLocalRandomProbe屬性作為 hash 值,這樣就相當(dāng)于每個線程都有自己“專屬”的RingBuffer,就不會產(chǎn)生競爭啦,而不是用 key 的hashCode作為 hash 值,因為會產(chǎn)生熱點數(shù)據(jù)問題。

        看看StripedBuffer的屬性

        /**?Table?of?buffers.?When?non-null,?size?is?a?power?of?2.?*/
        //RingBuffer數(shù)組
        transient?volatile?Buffer?@Nullable[]?table;

        //當(dāng)進(jìn)行resize時,需要整個table鎖住。tableBusy作為CAS的標(biāo)記。
        static?final?long?TABLE_BUSY?=?UnsafeAccess.objectFieldOffset(StripedBuffer.class,?"tableBusy");
        static?final?long?PROBE?=?UnsafeAccess.objectFieldOffset(Thread.class,?"threadLocalRandomProbe");

        /**?Number?of?CPUS.?*/
        static?final?int?NCPU?=?Runtime.getRuntime().availableProcessors();

        /**?The?bound?on?the?table?size.?*/
        //table最大size
        static?final?int?MAXIMUM_TABLE_SIZE?=?4?*?ceilingNextPowerOfTwo(NCPU);

        /**?The?maximum?number?of?attempts?when?trying?to?expand?the?table.?*/
        //如果發(fā)生競爭時(CAS失?。┑膰L試次數(shù)
        static?final?int?ATTEMPTS?=?3;

        /**?Table?of?buffers.?When?non-null,?size?is?a?power?of?2.?*/
        //核心數(shù)據(jù)結(jié)構(gòu)
        transient?volatile?Buffer?@Nullable[]?table;

        /**?Spinlock?(locked?via?CAS)?used?when?resizing?and/or?creating?Buffers.?*/
        transient?volatile?int?tableBusy;

        /**?CASes?the?tableBusy?field?from?0?to?1?to?acquire?lock.?*/
        final?boolean?casTableBusy()?{
        ??return?UnsafeAccess.UNSAFE.compareAndSwapInt(this,?TABLE_BUSY,?0,?1);
        }

        /**
        ?*?Returns?the?probe?value?for?the?current?thread.?Duplicated?from?ThreadLocalRandom?because?of
        ?*?packaging?restrictions.
        ?*/

        static?final?int?getProbe()?{
        ??return?UnsafeAccess.UNSAFE.getInt(Thread.currentThread(),?PROBE);
        }

        offer方法,當(dāng)沒初始化或存在競爭時,則擴(kuò)容為 2 倍。

        實際是調(diào)用RingBuffer的 offer 方法,把數(shù)據(jù)追加到RingBuffer后面。

        @Override
        public?int?offer(E?e)?{
        ??int?mask;
        ??int?result?=?0;
        ??Buffer?buffer;
        ??//是否不存在競爭
        ??boolean?uncontended?=?true;
        ??Buffer[]?buffers?=?table
        ??//是否已經(jīng)初始化
        ??if?((buffers?==?null)
        ??????||?(mask?=?buffers.length?-?1)?0
        ??????//用thread的隨機(jī)值作為hash值,得到對應(yīng)位置的RingBuffer
        ??????||?(buffer?=?buffers[getProbe()?&?mask])?==?null
        ??????//檢查追加到RingBuffer是否成功
        ??????||?!(uncontended?=?((result?=?buffer.offer(e))?!=?Buffer.FAILED)))?{
        ????//其中一個符合條件則進(jìn)行擴(kuò)容
        ????expandOrRetry(e,?uncontended);
        ??}
        ??return?result;
        }

        /**
        ?*?Handles?cases?of?updates?involving?initialization,?resizing,?creating?new?Buffers,?and/or
        ?*?contention.?See?above?for?explanation.?This?method?suffers?the?usual?non-modularity?problems?of
        ?*?optimistic?retry?code,?relying?on?rechecked?sets?of?reads.
        ?*
        ?*?@param?e?the?element?to?add
        ?*?@param?wasUncontended?false?if?CAS?failed?before?call
        ?*/


        //這個方法比較長,但思路還是相對清晰的。
        @SuppressWarnings("PMD.ConfusingTernary")
        final?void?expandOrRetry(E?e,?boolean?wasUncontended)?{
        ??int?h;
        ??if?((h?=?getProbe())?==?0)?{
        ????ThreadLocalRandom.current();?//?force?initialization
        ????h?=?getProbe();
        ????wasUncontended?=?true;
        ??}
        ??boolean?collide?=?false;?//?True?if?last?slot?nonempty
        ??for?(int?attempt?=?0;?attempt?????Buffer[]?buffers;
        ????Buffer?buffer;
        ????int?n;
        ????if?(((buffers?=?table)?!=?null)?&&?((n?=?buffers.length)?>?0))?{
        ??????if?((buffer?=?buffers[(n?-?1)?&?h])?==?null)?{
        ????????if?((tableBusy?==?0)?&&?casTableBusy())?{?//?Try?to?attach?new?Buffer
        ??????????boolean?created?=?false;
        ??????????try?{?//?Recheck?under?lock
        ????????????Buffer[]?rs;
        ????????????int?mask,?j;
        ????????????if?(((rs?=?table)?!=?null)?&&?((mask?=?rs.length)?>?0)
        ????????????????&&?(rs[j?=?(mask?-?1)?&?h]?==?null))?{
        ??????????????rs[j]?=?create(e);
        ??????????????created?=?true;
        ????????????}
        ??????????}?finally?{
        ????????????tableBusy?=?0;
        ??????????}
        ??????????if?(created)?{
        ????????????break;
        ??????????}
        ??????????continue;?//?Slot?is?now?non-empty
        ????????}
        ????????collide?=?false;
        ??????}?else?if?(!wasUncontended)?{?//?CAS?already?known?to?fail
        ????????wasUncontended?=?true;??????//?Continue?after?rehash
        ??????}?else?if?(buffer.offer(e)?!=?Buffer.FAILED)?{
        ????????break;
        ??????}?else?if?(n?>=?MAXIMUM_TABLE_SIZE?||?table?!=?buffers)?{
        ????????collide?=?false;?//?At?max?size?or?stale
        ??????}?else?if?(!collide)?{
        ????????collide?=?true;
        ??????}?else?if?(tableBusy?==?0?&&?casTableBusy())?{
        ????????try?{
        ??????????if?(table?==?buffers)?{?//?Expand?table?unless?stale
        ????????????table?=?Arrays.copyOf(buffers,?n?<1);
        ??????????}
        ????????}?finally?{
        ??????????tableBusy?=?0;
        ????????}
        ????????collide?=?false;
        ????????continue;?//?Retry?with?expanded?table
        ??????}
        ??????h?=?advanceProbe(h);
        ????}?else?if?((tableBusy?==?0)?&&?(table?==?buffers)?&&?casTableBusy())?{
        ??????boolean?init?=?false;
        ??????try?{?//?Initialize?table
        ????????if?(table?==?buffers)?{
        ??????????@SuppressWarnings({"unchecked",?"rawtypes"})
        ??????????Buffer[]?rs?=?new?Buffer[1];
        ??????????rs[0]?=?create(e);
        ??????????table?=?rs;
        ??????????init?=?true;
        ????????}
        ??????}?finally?{
        ????????tableBusy?=?0;
        ??????}
        ??????if?(init)?{
        ????????break;
        ??????}
        ????}
        ??}
        }

        最后看看RingBuffer,注意RingBufferBoundedBuffer的內(nèi)部類。

        /**?The?maximum?number?of?elements?per?buffer.?*/
        static?final?int?BUFFER_SIZE?=?16;

        //?Assume?4-byte?references?and?64-byte?cache?line?(16?elements?per?line)
        //256長度,但是是以16為單位,所以最多存放16個元素
        static?final?int?SPACED_SIZE?=?BUFFER_SIZE?<4;
        static?final?int?SPACED_MASK?=?SPACED_SIZE?-?1;
        static?final?int?OFFSET?=?16;
        //RingBuffer數(shù)組
        final?AtomicReferenceArray?buffer;

        ?//插入方法
        ?@Override
        ?public?int?offer(E?e)?{
        ???long?head?=?readCounter;
        ???long?tail?=?relaxedWriteCounter();
        ???//用head和tail來限制個數(shù)
        ???long?size?=?(tail?-?head);
        ???if?(size?>=?SPACED_SIZE)?{
        ?????return?Buffer.FULL;
        ???}
        ???//tail追加16
        ???if?(casWriteCounter(tail,?tail?+?OFFSET))?{
        ?????//用tail“取余”得到下標(biāo)
        ?????int?index?=?(int)?(tail?&?SPACED_MASK);
        ?????//用unsafe.putOrderedObject設(shè)值
        ?????buffer.lazySet(index,?e);
        ?????return?Buffer.SUCCESS;
        ???}
        ???//如果CAS失敗則返回失敗
        ???return?Buffer.FAILED;
        ?}

        ?//用consumer來處理buffer的數(shù)據(jù)
        ?@Override
        ?public?void?drainTo(Consumer?consumer)?{
        ???long?head?=?readCounter;
        ???long?tail?=?relaxedWriteCounter();
        ???//判斷數(shù)據(jù)多少
        ???long?size?=?(tail?-?head);
        ???if?(size?==?0)?{
        ?????return;
        ???}
        ???do?{
        ?????int?index?=?(int)?(head?&?SPACED_MASK);
        ?????E?e?=?buffer.get(index);
        ?????if?(e?==?null)?{
        ???????//?not?published?yet
        ???????break;
        ?????}
        ?????buffer.lazySet(index,?null);
        ?????consumer.accept(e);
        ?????//head也跟tail一樣,每次遞增16
        ?????head?+=?OFFSET;
        ???}?while?(head?!=?tail);
        ???lazySetReadCounter(head);
        ?}

        注意,ring buffer 的 size(固定是 16 個)是不變的,變的是 head 和 tail 而已。

        總的來說ReadBuffer有如下特點:

        • 使用 Striped-RingBuffer來提升對 buffer 的讀寫
        • 用 thread 的 hash 來避開熱點 key 的競爭
        • 允許寫入的丟失

        WriteBuffer

        writeBufferreadBuffer不一樣,主要體現(xiàn)在使用場景的不一樣。本來緩存的一般場景是讀多寫少的,讀的并發(fā)會更高,且 afterRead 顯得沒那么重要,允許延遲甚至丟失。寫不一樣,寫afterWrite不允許丟失,且要求盡量馬上執(zhí)行。Caffeine 使用MPSC(Multiple Producer / Single Consumer)作為 buffer 數(shù)組,實現(xiàn)在MpscGrowableArrayQueue類,它是仿照JCToolsMpscGrowableArrayQueue來寫的。

        MPSC 允許無鎖的高并發(fā)寫入,但只允許一個消費者,同時也犧牲了部分操作。

        MPSC 我打算另外分析,這里不展開了。

        TimerWheel

        除了支持expireAfterAccessexpireAfterWrite之外(Guava Cache 也支持這兩個特性),Caffeine 還支持expireAfter。因為expireAfterAccessexpireAfterWrite都只能是固定的過期時間,這可能滿足不了某些場景,譬如記錄的過期時間是需要根據(jù)某些條件而不一樣的,這就需要用戶自定義過期時間。

        先看看expireAfter的用法

        private?static?LoadingCache?cache?=?Caffeine.newBuilder()
        ????????.maximumSize(256L)
        ????????.initialCapacity(1)
        ????????//.expireAfterAccess(2,?TimeUnit.DAYS)
        ????????//.expireAfterWrite(2,?TimeUnit.HOURS)
        ????????.refreshAfterWrite(1,?TimeUnit.HOURS)
        ????????//自定義過期時間
        ????????.expireAfter(new?Expiry()?{
        ????????????//返回創(chuàng)建后的過期時間
        ????????????@Override
        ????????????public?long?expireAfterCreate(@NonNull?String?key,?@NonNull?String?value,?long?currentTime)?{
        ????????????????return?0;
        ????????????}

        ????????????//返回更新后的過期時間
        ????????????@Override
        ????????????public?long?expireAfterUpdate(@NonNull?String?key,?@NonNull?String?value,?long?currentTime,?@NonNegative?long?currentDuration)?{
        ????????????????return?0;
        ????????????}

        ????????????//返回讀取后的過期時間
        ????????????@Override
        ????????????public?long?expireAfterRead(@NonNull?String?key,?@NonNull?String?value,?long?currentTime,?@NonNegative?long?currentDuration)?{
        ????????????????return?0;
        ????????????}
        ????????})
        ????????.recordStats()
        ????????.build(new?CacheLoader()?{
        ????????????@Nullable
        ????????????@Override
        ????????????public?String?load(@NonNull?String?key)?throws?Exception?{
        ????????????????return?"value_"?+?key;
        ????????????}
        ????????});

        通過自定義過期時間,使得不同的 key 可以動態(tài)的得到不同的過期時間。

        注意,我把expireAfterAccessexpireAfterWrite注釋了,因為這兩個特性不能跟expireAfter一起使用。

        而當(dāng)使用了expireAfter特性后,Caffeine 會啟用一種叫“時間輪”的算法來實現(xiàn)這個功能。更多關(guān)于時間輪的介紹,可以看我的文章HashedWheelTimer 時間輪原理分析[6]

        好,重點來了,為什么要用時間輪?

        expireAfterAccessexpireAfterWrite的實現(xiàn)是用一個AccessOrderDeque雙端隊列,它是 FIFO 的,因為它們的過期時間是固定的,所以在隊列頭的數(shù)據(jù)肯定是最早過期的,要處理過期數(shù)據(jù)時,只需要首先看看頭部是否過期,然后再挨個檢查就可以了。但是,如果過期時間不一樣的話,這需要對accessOrderQueue進(jìn)行排序&插入,這個代價太大了。于是,Caffeine 用了一種更加高效、優(yōu)雅的算法-時間輪。

        時間輪的結(jié)構(gòu):

        因為在我的對時間輪分析的文章里已經(jīng)說了時間輪的原理和機(jī)制了,所以我就不展開 Caffeine 對時間輪的實現(xiàn)了。

        Caffeine 對時間輪的實現(xiàn)在TimerWheel,它是一種多層時間輪(hierarchical timing wheels )。

        看看元素加入到時間輪的schedule方法:

        /**
        ?*?Schedules?a?timer?event?for?the?node.
        ?*
        ?*?@param?node?the?entry?in?the?cache
        ?*/

        public?void?schedule(@NonNull?Node?node)?{
        ??Node?sentinel?=?findBucket(node.getVariableTime());
        ??link(sentinel,?node);
        }

        /**
        ?*?Determines?the?bucket?that?the?timer?event?should?be?added?to.
        ?*
        ?*?@param?time?the?time?when?the?event?fires
        ?*?@return?the?sentinel?at?the?head?of?the?bucket
        ?*/

        Node?findBucket(long?time)?{
        ??long?duration?=?time?-?nanos;
        ??int?length?=?wheel.length?-?1;
        ??for?(int?i?=?0;?i?????if?(duration?1])?{
        ??????long?ticks?=?(time?>>>?SHIFT[i]);
        ??????int?index?=?(int)?(ticks?&?(wheel[i].length?-?1));
        ??????return?wheel[i][index];
        ????}
        ??}
        ??return?wheel[length][0];
        }

        /**?Adds?the?entry?at?the?tail?of?the?bucket's?list.?*/
        void?link(Node?sentinel,?Node?node)?{
        ??node.setPreviousInVariableOrder(sentinel.getPreviousInVariableOrder());
        ??node.setNextInVariableOrder(sentinel);

        ??sentinel.getPreviousInVariableOrder().setNextInVariableOrder(node);
        ??sentinel.setPreviousInVariableOrder(node);
        }

        其他

        Caffeine 還有其他的優(yōu)化性能的手段,如使用軟引用和弱引用、消除偽共享、CompletableFuture異步等等。

        總結(jié)

        Caffeien 是一個優(yōu)秀的本地緩存,通過使用 W-TinyLFU 算法, 高性能的 readBuffer 和 WriteBuffer,時間輪算法等,使得它擁有高性能,高命中率(near optimal),低內(nèi)存占用等特點。

        參考資料

        TinyLFU 論文[7]

        Design Of A Modern Cache[8]

        Design Of A Modern Cache—Part Deux[9]

        Caffeine 的 github[10]

        參考資料

        [1]

        Caffeine:?https://github.com/ben-manes/caffeine

        [2]

        這里:?https://albenw.github.io/posts/df42dc84/

        [3]

        Benchmarks:?https://github.com/ben-manes/caffeine/wiki/Benchmarks

        [4]

        官方API說明文檔:?https://github.com/ben-manes/caffeine/wiki

        [5]

        這里:?https://github.com/ben-manes/caffeine/wiki/Guava

        [6]

        HashedWheelTimer時間輪原理分析:?https://albenw.github.io/posts/ec8df8c/

        [7]

        TinyLFU論文:?https://arxiv.org/abs/1512.00727

        [8]

        Design Of A Modern Cache:?http://highscalability.com/blog/2016/1/25/design-of-a-modern-cache.html

        [9]

        Design Of A Modern Cache—Part Deux:?http://highscalability.com/blog/2019/2/25/design-of-a-modern-cachepart-deux.html

        [10]

        Caffeine的github:?https://github.com/ben-manes/caffeine

        推薦閱讀

        面試題:InnoDB 中一棵 B+ 樹能存多少行數(shù)據(jù)?【面試不翻車,翻車就跑路】

        我畫了35張圖就是為了讓你深入 AQS

        Map 集合怎么也有這么多坑?一不小心又踩了好幾個!


        瀏覽 54
        點贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            91视频操逼| 国产免费无码成人A片在线观看 | 一区二区三区在线 | 性爱精品视频 | www.偷拍| 内射国产精品 | 淫片一级国产 | 欧美黄片免费 | 在线午夜福利 | 91无码人妻精品一区二区蜜桃 |