1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        ConcurrentHashMap 是如何保證線程安全的,你知道么?

        共 4992字,需瀏覽 10分鐘

         ·

        2022-05-10 00:24

        點(diǎn)擊下方“IT牧場”,選擇“設(shè)為星標(biāo)”

        blog.csdn.net/qq_41737716/article/details/90549847

        • 01、前言
        • 02、相關(guān)概念
          • 03、Amdahl定律
        • 04、初始化數(shù)據(jù)結(jié)構(gòu)時的線程安全
          • 05、總結(jié)
        • 06、put操作的線程安全
          • 07、總結(jié)
        • 08、擴(kuò)容操作的線程安全
          • 09、擴(kuò)容時的get操作
          • 10、多線程協(xié)助擴(kuò)容
          • 11、在什么情況下會進(jìn)行擴(kuò)容操作?
          • 12、總結(jié)
        • 13、統(tǒng)計(jì)容器大小的線程安全
          • 14、假設(shè)當(dāng)前線程為第一個put的線程
          • 15、出現(xiàn)了線程競爭導(dǎo)致CAS失敗
          • 16、計(jì)數(shù)桶擴(kuò)容
          • 17、總結(jié)
        • 18、get操作的線程安全
        • 19、JDK1.7與1.8的不同實(shí)現(xiàn)
        • 20、總結(jié)

        01、前言

        閱讀此篇文章,你需要有以下知識基礎(chǔ)

        • Java內(nèi)存模型,可見性問題
        • CAS
        • HashMap底層原理

        我們知道,在日常開發(fā)中使用的HashMap是線程不安全的,而線程安全類HashTable只是簡單的在方法上加鎖實(shí)現(xiàn)線程安全,效率低下,所以在線程安全的環(huán)境下我們通常會使用ConcurrentHashMap,但是又為何需要學(xué)習(xí)ConcurrentHashMap?用不就完事了?我認(rèn)為學(xué)習(xí)其源碼有兩個好處:

        1. 更靈活的運(yùn)用ConcurrentHashMap
        2. 欣賞并發(fā)編程大師Doug Lea的作品,源碼中有很多值得我們學(xué)習(xí)的并發(fā)思想,要意識到,線程安全不僅僅只是加鎖

        我拋出以下問題:

        • ConcurrentHashMap是怎么做到線程安全的?
          • get方法如何線程安全地獲取key、value?
          • put方法如何線程安全地設(shè)置key、value?
          • size方法如果線程安全地獲取容器容量?
          • 底層數(shù)據(jù)結(jié)構(gòu)擴(kuò)容時如果保證線程安全?
          • 初始化數(shù)據(jù)結(jié)構(gòu)時如果保證線程安全?
        • ConcurrentHashMap并發(fā)效率是如何提高的?
          • 和加鎖相比較,為什么它比HashTable效率高?

        接下來,帶著問題來繼續(xù)看下去,欣賞并發(fā)大師精妙絕倫的并發(fā)藝術(shù)作品(以下討論基于JDK1.8)

        02、相關(guān)概念

        03、Amdahl定律

        此節(jié)定律描述均來自《Java并發(fā)編程實(shí)戰(zhàn)》一書

        假設(shè)F是必須被串行執(zhí)行的部分,N代表處理器數(shù)量,Speedup代表加速比,可以簡單理解為CPU使用率此公式告訴我們,當(dāng)N趨近無限大,加速比最大趨近于1/F,假設(shè)我們的程序有50%的部分需要串行執(zhí)行,就算處理器數(shù)量無限多,最高的加速比只能是2(20%的使用率),如果程序中僅有10%的部分需要串行執(zhí)行,最高的加速比可以達(dá)到9.2(92%的使用率),但我們的程序或多或少都一定會有串行執(zhí)行的部分,所以F不可能為0,所以,就算有無限多的CPU,加速比也不可能達(dá)到10(100%的使用率),下面給一張圖來表示串行執(zhí)行部分占比不同對利用率的影響:由此我們可以看出,程序中的可伸縮性(提升外部資源即可提升并發(fā)性能的比率)是由程序中串行執(zhí)行部分所影響的,而常見的串行執(zhí)行有鎖競爭(上下文切換消耗、等待、串行)等等,這給了我們一個啟發(fā),可以通過減少鎖競爭來優(yōu)化并發(fā)性能,而ConcurrentHashMap則使用了鎖分段(減小鎖范圍)、CAS(樂觀鎖,減小上下文切換開銷,無阻塞)等等技術(shù),下面來具體看看吧

        04、初始化數(shù)據(jù)結(jié)構(gòu)時的線程安全

        HashMap的底層數(shù)據(jù)結(jié)構(gòu)這里簡單帶過一下,不做過多贅述:大致是以一個Node對象數(shù)組來存放數(shù)據(jù),Hash沖突時會形成Node鏈表,在鏈表長度超過8,Node數(shù)組超過64時會將鏈表結(jié)構(gòu)轉(zhuǎn)換為紅黑樹,Node對象:

        static?class?Node<K,V>?implements?Map.Entry<K,V>?{
        ??final?int?hash;
        ??final?K?key;
        ??volatile?V?val;
        ??volatile?Node?next;
        ??...
        }

        值得注意的是,value和next指針使用了volatile來保證其可見性

        在JDK1.8中,初始化ConcurrentHashMap的時候這個Node[]數(shù)組是還未初始化的,會等到第一次put方法調(diào)用時才初始化:

        final?V?putVal(K?key,?V?value,?boolean?onlyIfAbsent)?{
        ????????if?(key?==?null?||?value?==?null)?throw?new?NullPointerException();
        ????????int?hash?=?spread(key.hashCode());
        ????????int?binCount?=?0;
        ????????for?(Node[]?tab?=?table;;)?{
        ????????????Node?f;?int?n,?i,?fh;
        ????????????//判斷Node數(shù)組為空
        ????????????if?(tab?==?null?||?(n?=?tab.length)?==?0)
        ????????????????//初始化Node數(shù)組
        ????????????????tab?=?initTable();
        ??????????...
        }

        此時是會有并發(fā)問題的,如果多個線程同時調(diào)用initTable初始化Node數(shù)組怎么辦?看看大師是如何處理的:

        private?final?Node[]?initTable()?{
        ??Node[]?tab;?int?sc;
        ??//每次循環(huán)都獲取最新的Node數(shù)組引用
        ??while?((tab?=?table)?==?null?||?tab.length?==?0)?{
        ????//sizeCtl是一個標(biāo)記位,若為-1也就是小于0,代表有線程在進(jìn)行初始化工作了
        ????if?((sc?=?sizeCtl)?0)
        ??????//讓出CPU時間片
        ??????Thread.yield();?//?lost?initialization?race;?just?spin
        ????//CAS操作,將本實(shí)例的sizeCtl變量設(shè)置為-1
        ????else?if?(U.compareAndSwapInt(this,?SIZECTL,?sc,?-1))?{
        ??????//如果CAS操作成功了,代表本線程將負(fù)責(zé)初始化工作
        ??????try?{
        ????????//再檢查一遍數(shù)組是否為空
        ????????if?((tab?=?table)?==?null?||?tab.length?==?0)?{
        ??????????//在初始化Map時,sizeCtl代表數(shù)組大小,默認(rèn)16
        ??????????//所以此時n默認(rèn)為16
        ??????????int?n?=?(sc?>?0)???sc?:?DEFAULT_CAPACITY;
        ??????????@SuppressWarnings("unchecked")
        ??????????//Node數(shù)組
        ??????????Node[]?nt?=?(Node[])new?Node[n];
        ??????????//將其賦值給table變量
        ??????????table?=?tab?=?nt;
        ??????????//通過位運(yùn)算,n減去n二進(jìn)制右移2位,相當(dāng)于乘以0.75
        ??????????//例如16經(jīng)過運(yùn)算為12,與乘0.75一樣,只不過位運(yùn)算更快
        ??????????sc?=?n?-?(n?>>>?2);
        ????????}
        ??????}?finally?{
        ????????//將計(jì)算后的sc(12)直接賦值給sizeCtl,表示達(dá)到12長度就擴(kuò)容
        ????????//由于這里只會有一個線程在執(zhí)行,直接賦值即可,沒有線程安全問題
        ????????//只需要保證可見性
        ????????sizeCtl?=?sc;
        ??????}
        ??????break;
        ????}
        ??}
        ??return?tab;
        }

        table變量使用了volatile來保證每次獲取到的都是最新寫入的值

        transient?volatile?Node[]?table;

        05、總結(jié)

        就算有多個線程同時進(jìn)行put操作,在初始化數(shù)組時使用了樂觀鎖CAS操作來決定到底是哪個線程有資格進(jìn)行初始化,其他線程均只能等待。

        用到的并發(fā)技巧:

        • volatile變量(sizeCtl):它是一個標(biāo)記位,用來告訴其他線程這個坑位有沒有人在,其線程間的可見性由volatile保證。
        • CAS操作:CAS操作保證了設(shè)置sizeCtl標(biāo)記位的原子性,保證了只有一個線程能設(shè)置成功

        06、put操作的線程安全

        直接看代碼:

        final?V?putVal(K?key,?V?value,?boolean?onlyIfAbsent)?{
        ??if?(key?==?null?||?value?==?null)?throw?new?NullPointerException();
        ??//對key的hashCode進(jìn)行散列
        ??int?hash?=?spread(key.hashCode());
        ??int?binCount?=?0;
        ??//一個無限循環(huán),直到put操作完成后退出循環(huán)
        ??for?(Node[]?tab?=?table;;)?{
        ????Node?f;?int?n,?i,?fh;
        ????//當(dāng)Node數(shù)組為空時進(jìn)行初始化
        ????if?(tab?==?null?||?(n?=?tab.length)?==?0)
        ??????tab?=?initTable();
        ????//Unsafe類volatile的方式取出hashCode散列后通過與運(yùn)算得出的Node數(shù)組下標(biāo)值對應(yīng)的Node對象
        ????//此時的Node對象若為空,則代表還未有線程對此Node進(jìn)行插入操作
        ????else?if?((f?=?tabAt(tab,?i?=?(n?-?1)?&?hash))?==?null)?{
        ??????//直接CAS方式插入數(shù)據(jù)
        ??????if?(casTabAt(tab,?i,?null,
        ???????????????????new?Node(hash,?key,?value,?null)))
        ????????//插入成功,退出循環(huán)
        ????????break;???????????????????//?no?lock?when?adding?to?empty?bin
        ????}
        ????//查看是否在擴(kuò)容,先不看,擴(kuò)容再介紹
        ????else?if?((fh?=?f.hash)?==?MOVED)
        ??????//幫助擴(kuò)容
        ??????tab?=?helpTransfer(tab,?f);
        ????else?{
        ??????V?oldVal?=?null;
        ??????//對Node對象進(jìn)行加鎖
        ??????synchronized?(f)?{
        ????????//二次確認(rèn)此Node對象還是原來的那一個
        ????????if?(tabAt(tab,?i)?==?f)?{
        ??????????if?(fh?>=?0)?{
        ????????????binCount?=?1;
        ????????????//無限循環(huán),直到完成put
        ????????????for?(Node?e?=?f;;?++binCount)?{
        ??????????????K?ek;
        ??????????????//和HashMap一樣,先比較hash,再比較equals
        ??????????????if?(e.hash?==?hash?&&
        ??????????????????((ek?=?e.key)?==?key?||
        ???????????????????(ek?!=?null?&&?key.equals(ek))))?{
        ????????????????oldVal?=?e.val;
        ????????????????if?(!onlyIfAbsent)
        ??????????????????e.val?=?value;
        ????????????????break;
        ??????????????}
        ??????????????Node?pred?=?e;
        ??????????????if?((e?=?e.next)?==?null)?{
        ????????????????//和鏈表頭Node節(jié)點(diǎn)不沖突,就將其初始化為新Node作為上一個Node節(jié)點(diǎn)的next
        ????????????????//形成鏈表結(jié)構(gòu)
        ????????????????pred.next?=?new?Node(hash,?key,
        ??????????????????????????????????????????value,?null);
        ????????????????break;
        ??????????????}
        ????????????}
        ??????????}
        ??????????...
        }

        值得關(guān)注的是tabAt(tab, i)方法,其使用Unsafe類volatile的操作volatile式地查看值,保證每次獲取到的值都是最新的:

        static?final??Node?tabAt(Node[]?tab,?int?i)?{
        ??return?(Node)U.getObjectVolatile(tab,?((long)i?<}

        雖然上面的table變量加了volatile,但也只能保證其引用的可見性,并不能確保其數(shù)組中的對象是否是最新的,所以需要Unsafe類volatile式地拿到最新的Node

        07、總結(jié)

        由于其減小了鎖的粒度,若Hash完美不沖突的情況下,可同時支持n個線程同時put操作,n為Node數(shù)組大小,在默認(rèn)大小16下,可以支持最大同時16個線程無競爭同時操作且線程安全。當(dāng)hash沖突嚴(yán)重時,Node鏈表越來越長,將導(dǎo)致嚴(yán)重的鎖競爭,此時會進(jìn)行擴(kuò)容,將Node進(jìn)行再散列,下面會介紹擴(kuò)容的線程安全性??偨Y(jié)一下用到的并發(fā)技巧:

        1. 減小鎖粒度:將Node鏈表的頭節(jié)點(diǎn)作為鎖,若在默認(rèn)大小16情況下,將有16把鎖,大大減小了鎖競爭(上下文切換),就像開頭所說,將串行的部分最大化縮小,在理想情況下線程的put操作都為并行操作。同時直接鎖住頭節(jié)點(diǎn),保證了線程安全
        2. Unsafe的getObjectVolatile方法:此方法確保獲取到的值為最新

        08、擴(kuò)容操作的線程安全

        在擴(kuò)容時,ConcurrentHashMap支持多線程并發(fā)擴(kuò)容,在擴(kuò)容過程中同時支持get查數(shù)據(jù),若有線程put數(shù)據(jù),還會幫助一起擴(kuò)容,這種無阻塞算法,將并行最大化的設(shè)計(jì),堪稱一絕。

        先來看看擴(kuò)容代碼實(shí)現(xiàn):

        private?final?void?transfer(Node[]?tab,?Node[]?nextTab)?{
        ??int?n?=?tab.length,?stride;
        ??//根據(jù)機(jī)器CPU核心數(shù)來計(jì)算,一條線程負(fù)責(zé)Node數(shù)組中多長的遷移量
        ??if?((stride?=?(NCPU?>?1)???(n?>>>?3)?/?NCPU?:?n)?????//本線程分到的遷移量
        ????//假設(shè)為16(默認(rèn)也為16)
        ????stride?=?MIN_TRANSFER_STRIDE;?//?subdivide?range
        ??//nextTab若為空代表線程是第一個進(jìn)行遷移的
        ??//初始化遷移后的新Node數(shù)組
        ??if?(nextTab?==?null)?{????????????//?initiating
        ????try?{
        ??????@SuppressWarnings("unchecked")
        ??????//這里n為舊數(shù)組長度,左移一位相當(dāng)于乘以2
        ??????//例如原數(shù)組長度16,新數(shù)組長度則為32
        ??????Node[]?nt?=?(Node[])new?Node[n?<1];
        ??????nextTab?=?nt;
        ????}?catch?(Throwable?ex)?{??????//?try?to?cope?with?OOME
        ??????sizeCtl?=?Integer.MAX_VALUE;
        ??????return;
        ????}
        ????//設(shè)置nextTable變量為新數(shù)組
        ????nextTable?=?nextTab;
        ????//假設(shè)為16
        ????transferIndex?=?n;
        ??}
        ??//假設(shè)為32
        ??int?nextn?=?nextTab.length;
        ??//標(biāo)示Node對象,此對象的hash變量為-1
        ??//在get或者put時若遇到此Node,則可以知道當(dāng)前Node正在遷移
        ??//傳入nextTab對象
        ??ForwardingNode?fwd?=?new?ForwardingNode(nextTab);
        ??boolean?advance?=?true;
        ??boolean?finishing?=?false;?//?to?ensure?sweep?before?committing?nextTab
        ??for?(int?i?=?0,?bound?=?0;;)?{
        ????Node?f;?int?fh;
        ????while?(advance)?{
        ??????int?nextIndex,?nextBound;
        ??????//i為當(dāng)前正在處理的Node數(shù)組下標(biāo),每次處理一個Node節(jié)點(diǎn)就會自減1
        ??????if?(--i?>=?bound?||?finishing)
        ????????advance?=?false;
        ??????//假設(shè)nextIndex=16
        ??????else?if?((nextIndex?=?transferIndex)?<=?0)?{
        ????????i?=?-1;
        ????????advance?=?false;
        ??????}
        ??????//由以上假設(shè),nextBound就為0
        ??????//且將nextIndex設(shè)置為0
        ??????else?if?(U.compareAndSwapInt
        ???????????????(this,?TRANSFERINDEX,?nextIndex,
        ????????????????nextBound?=?(nextIndex?>?stride??
        ?????????????????????????????nextIndex?-?stride?:?0)))?{
        ????????//bound=0
        ????????bound?=?nextBound;
        ????????//i=16-1=15
        ????????i?=?nextIndex?-?1;
        ????????advance?=?false;
        ??????}
        ????}
        ????if?(i?0?||?i?>=?n?||?i?+?n?>=?nextn)?{
        ??????int?sc;
        ??????if?(finishing)?{
        ????????nextTable?=?null;
        ????????table?=?nextTab;
        ????????sizeCtl?=?(n?<1)?-?(n?>>>?1);
        ????????return;
        ??????}
        ??????if?(U.compareAndSwapInt(this,?SIZECTL,?sc?=?sizeCtl,?sc?-?1))?{
        ????????if?((sc?-?2)?!=?resizeStamp(n)?<??????????return;
        ????????finishing?=?advance?=?true;
        ????????i?=?n;?//?recheck?before?commit
        ??????}
        ????}
        ????//此時i=15,取出Node數(shù)組下標(biāo)為15的那個Node,若為空則不需要遷移
        ????//直接設(shè)置占位標(biāo)示,代表此Node已處理完成
        ????else?if?((f?=?tabAt(tab,?i))?==?null)
        ??????advance?=?casTabAt(tab,?i,?null,?fwd);
        ????//檢測此Node的hash是否為MOVED,MOVED是一個常量-1,也就是上面說的占位Node的hash
        ????//如果是占位Node,證明此節(jié)點(diǎn)已經(jīng)處理過了,跳過i=15的處理,繼續(xù)循環(huán)
        ????else?if?((fh?=?f.hash)?==?MOVED)
        ??????advance?=?true;?//?already?processed
        ????else?{
        ??????//鎖住這個Node
        ??????synchronized?(f)?{
        ????????//確認(rèn)Node是原先的Node
        ????????if?(tabAt(tab,?i)?==?f)?{
        ??????????//ln為lowNode,低位Node,hn為highNode,高位Node
        ??????????//這兩個概念下面以圖來說明
        ??????????Node?ln,?hn;
        ??????????if?(fh?>=?0)?{
        ????????????//此時fh與原來Node數(shù)組長度進(jìn)行與運(yùn)算
        ????????????//如果高X位為0,此時runBit=0
        ????????????//如果高X位為1,此時runBit=1
        ????????????int?runBit?=?fh?&?n;
        ????????????Node?lastRun?=?f;
        ????????????for?(Node?p?=?f.next;?p?!=?null;?p?=?p.next)?{
        ??????????????//這里的Node,都是同一Node鏈表中的Node對象
        ??????????????int?b?=?p.hash?&?n;
        ??????????????if?(b?!=?runBit)?{
        ????????????????runBit?=?b;
        ????????????????lastRun?=?p;
        ??????????????}
        ????????????}
        ????????????//正如上面所說,runBit=0,表示此Node為低位Node
        ????????????if?(runBit?==?0)?{
        ??????????????ln?=?lastRun;
        ??????????????hn?=?null;
        ????????????}
        ????????????else?{
        ??????????????//Node為高位Node
        ??????????????hn?=?lastRun;
        ??????????????ln?=?null;
        ????????????}
        ????????????for?(Node?p?=?f;?p?!=?lastRun;?p?=?p.next)?{
        ??????????????int?ph?=?p.hash;?K?pk?=?p.key;?V?pv?=?p.val;
        ??????????????//若hash和n與運(yùn)算為0,證明為低位Node,原理同上
        ??????????????if?((ph?&?n)?==?0)
        ????????????????ln?=?new?Node(ph,?pk,?pv,?ln);
        ??????????????//這里將高位Node與地位Node都各自組成了兩個鏈表
        ??????????????else
        ????????????????hn?=?new?Node(ph,?pk,?pv,?hn);
        ????????????}
        ????????????//將低位Node設(shè)置到新Node數(shù)組中,下標(biāo)為原來的位置
        ????????????setTabAt(nextTab,?i,?ln);
        ????????????//將高位Node設(shè)置到新Node數(shù)組中,下標(biāo)為原來的位置加上原Node數(shù)組長度
        ????????????setTabAt(nextTab,?i?+?n,?hn);
        ????????????//將此Node設(shè)置為占位Node,代表處理完成
        ????????????setTabAt(tab,?i,?fwd);
        ????????????//繼續(xù)循環(huán)
        ????????????advance?=?true;
        ??????????}
        ??????????....
        ????????}
        ??????}
        ????}
        ??}
        }

        這里說一下遷移時為什么要分一個ln(低位Node)、hn(高位Node),首先說一個現(xiàn)象:

        我們知道,在put值的時候,首先會計(jì)算hash值,再散列到指定的Node數(shù)組下標(biāo)中:

        //根據(jù)key的hashCode再散列
        int?hash?=?spread(key.hashCode());
        //使用(n?-?1)?&?hash?運(yùn)算,定位Node數(shù)組中下標(biāo)值
        (f?=?tabAt(tab,?i?=?(n?-?1)?&?hash);

        其中n為Node數(shù)組長度,這里假設(shè)為16。

        假設(shè)有一個key進(jìn)來,它的散列之后的hash=9,那么它的下標(biāo)值是多少呢?

        • (16 - 1)和 9 進(jìn)行與運(yùn)算 -> 0000 1111 和 0000 1001 結(jié)果還是 0000 1001 = 9

        假設(shè)Node數(shù)組需要擴(kuò)容,我們知道,擴(kuò)容是將數(shù)組長度增加兩倍,也就是32,那么下標(biāo)值會是多少呢?

        • (32 - 1)和 9 進(jìn)行與運(yùn)算 -> 0001 1111 和 0000 1001 結(jié)果還是9

        此時,我們把散列之后的hash換成20,那么會有怎樣的變化呢?

        • (16 - 1)和 20 進(jìn)行與運(yùn)算 -> 0000 1111 和 0001 0100 結(jié)果是 0000 0100 = 4
        • (32 - 1)和 20 進(jìn)行與運(yùn)算 -> 0001 1111 和 0001 0100 結(jié)果是 0001 0100 = 20

        此時細(xì)心的讀者應(yīng)該可以發(fā)現(xiàn),如果hash在高X位為1,(X為數(shù)組長度的二進(jìn)制-1的最高位),則擴(kuò)容時是需要變換在Node數(shù)組中的索引值的,不然就hash不到,丟失數(shù)據(jù),所以這里在遷移的時候?qū)⒏遆位為1的Node分類為hn,將高X位為0的Node分類為ln。

        回到代碼中:

        for?(Node?p?=?f;?p?!=?lastRun;?p?=?p.next)?{
        ??int?ph?=?p.hash;?
        ??K?pk?=?p.key;?
        ??V?pv?=?p.val;
        ??if?((ph?&?n)?==?0)
        ????ln?=?new?Node(ph,?pk,?pv,?ln);
        ??else
        ????hn?=?new?Node(ph,?pk,?pv,?hn);
        }

        這個操作將高低位組成了兩條鏈表結(jié)構(gòu),由下圖所示:然后將其CAS操作放入新的Node數(shù)組中:

        setTabAt(nextTab,?i,?ln);
        setTabAt(nextTab,?i?+?n,?hn);

        其中,低位鏈表放入原下標(biāo)處,而高位鏈表則需要加上原Node數(shù)組長度,其中為什么不多贅述,上面已經(jīng)舉例說明了,這樣就可以保證高位Node在遷移到新Node數(shù)組中依然可以使用hash算法散列到對應(yīng)下標(biāo)的數(shù)組中去了。

        最后將原Node數(shù)組中對應(yīng)下標(biāo)Node對象設(shè)置為fwd標(biāo)記Node,表示該節(jié)點(diǎn)遷移完成,到這里,一個節(jié)點(diǎn)的遷移就完成了,將進(jìn)行下一個節(jié)點(diǎn)的遷移,也就是i-1=14下標(biāo)的Node節(jié)點(diǎn)。

        09、擴(kuò)容時的get操作

        假設(shè)Node下標(biāo)為16的Node節(jié)點(diǎn)正在遷移,突然有一個線程進(jìn)來調(diào)用get方法,正好key又散列到下標(biāo)為16的節(jié)點(diǎn),此時怎么辦?

        public?V?get(Object?key)?{
        ??Node[]?tab;?Node?e,?p;?int?n,?eh;?K?ek;
        ??int?h?=?spread(key.hashCode());
        ??if?((tab?=?table)?!=?null?&&?(n?=?tab.length)?>?0?&&
        ??????(e?=?tabAt(tab,?(n?-?1)?&?h))?!=?null)?{
        ????if?((eh?=?e.hash)?==?h)?{
        ??????if?((ek?=?e.key)?==?key?||?(ek?!=?null?&&?key.equals(ek)))
        ????????return?e.val;
        ????}
        ????//假如Node節(jié)點(diǎn)的hash值小于0
        ????//則有可能是fwd節(jié)點(diǎn)
        ????else?if?(eh?0)
        ??????//調(diào)用節(jié)點(diǎn)對象的find方法查找值
        ??????return?(p?=?e.find(h,?key))?!=?null???p.val?:?null;
        ????while?((e?=?e.next)?!=?null)?{
        ??????if?(e.hash?==?h?&&
        ??????????((ek?=?e.key)?==?key?||?(ek?!=?null?&&?key.equals(ek))))
        ????????return?e.val;
        ????}
        ??}
        ??return?null;
        }

        重點(diǎn)看有注釋的那兩行,在get操作的源碼中,會判斷Node中的hash是否小于0,是否還記得我們的占位Node,其hash為MOVED,為常量值-1,所以此時判斷線程正在遷移,委托給fwd占位Node去查找值:

        //內(nèi)部類?ForwardingNode中
        Node?find(int?h,?Object?k)?{
        ??//?loop?to?avoid?arbitrarily?deep?recursion?on?forwarding?nodes
        ??//?這里的查找,是去新Node數(shù)組中查找的
        ??//?下面的查找過程與HashMap查找無異,不多贅述
        ??outer:?for?(Node[]?tab?=?nextTable;;)?{
        ????Node?e;?int?n;
        ????if?(k?==?null?||?tab?==?null?||?(n?=?tab.length)?==?0?||
        ????????(e?=?tabAt(tab,?(n?-?1)?&?h))?==?null)
        ??????return?null;
        ????for?(;;)?{
        ??????int?eh;?K?ek;
        ??????if?((eh?=?e.hash)?==?h?&&
        ??????????((ek?=?e.key)?==?k?||?(ek?!=?null?&&?k.equals(ek))))
        ????????return?e;
        ??????if?(eh?0)?{
        ????????if?(e?instanceof?ForwardingNode)?{
        ??????????tab?=?((ForwardingNode)e).nextTable;
        ??????????continue?outer;
        ????????}
        ????????else
        ??????????return?e.find(h,?k);
        ??????}
        ??????if?((e?=?e.next)?==?null)
        ????????return?null;
        ????}
        ??}
        }

        到這里應(yīng)該可以恍然大悟了,之所以占位Node需要保存新Node數(shù)組的引用也是因?yàn)檫@個,它可以支持在遷移的過程中照樣不阻塞地查找值,可謂是精妙絕倫的設(shè)計(jì)。

        10、多線程協(xié)助擴(kuò)容

        在put操作時,假設(shè)正在遷移,正好有一個線程進(jìn)來,想要put值到遷移的Node上,怎么辦?

        final?V?putVal(K?key,?V?value,?boolean?onlyIfAbsent)?{
        ??if?(key?==?null?||?value?==?null)?throw?new?NullPointerException();
        ??int?hash?=?spread(key.hashCode());
        ??int?binCount?=?0;
        ??for?(Node[]?tab?=?table;;)?{
        ????Node?f;?int?n,?i,?fh;
        ????if?(tab?==?null?||?(n?=?tab.length)?==?0)
        ??????tab?=?initTable();
        ????else?if?((f?=?tabAt(tab,?i?=?(n?-?1)?&?hash))?==?null)?{
        ??????if?(casTabAt(tab,?i,?null,
        ???????????????????new?Node(hash,?key,?value,?null)))
        ????????break;???????????????????//?no?lock?when?adding?to?empty?bin
        ????}
        ????//若此時發(fā)現(xiàn)了占位Node,證明此時HashMap正在遷移
        ????else?if?((fh?=?f.hash)?==?MOVED)
        ??????//進(jìn)行協(xié)助遷移
        ??????tab?=?helpTransfer(tab,?f);
        ?????...
        }
        final?Node[]?helpTransfer(Node[]?tab,?Node?f)?{
        ??Node[]?nextTab;?int?sc;
        ??if?(tab?!=?null?&&?(f?instanceof?ForwardingNode)?&&
        ??????(nextTab?=?((ForwardingNode)f).nextTable)?!=?null)?{
        ????int?rs?=?resizeStamp(tab.length);
        ????while?(nextTab?==?nextTable?&&?table?==?tab?&&
        ???????????(sc?=?sizeCtl)?0)?{
        ??????if?((sc?>>>?RESIZE_STAMP_SHIFT)?!=?rs?||?sc?==?rs?+?1?||
        ??????????sc?==?rs?+?MAX_RESIZERS?||?transferIndex?<=?0)
        ????????break;
        ??????//sizeCtl加一,標(biāo)示多一個線程進(jìn)來協(xié)助擴(kuò)容
        ??????if?(U.compareAndSwapInt(this,?SIZECTL,?sc,?sc?+?1))?{
        ????????//擴(kuò)容
        ????????transfer(tab,?nextTab);
        ????????break;
        ??????}
        ????}
        ????return?nextTab;
        ??}
        ??return?table;
        }

        此方法涉及大量復(fù)雜的位運(yùn)算,這里不多贅述,只是簡單的說幾句,此時sizeCtl變量用來標(biāo)示HashMap正在擴(kuò)容,當(dāng)其準(zhǔn)備擴(kuò)容時,會將sizeCtl設(shè)置為一個負(fù)數(shù),(例如數(shù)組長度為16時)其二進(jìn)制表示為:

        1000?0000?0001?1011?0000?0000?0000?0010

        無符號位為1,表示負(fù)數(shù)。其中高16位代表數(shù)組長度的一個位算法標(biāo)示(有點(diǎn)像epoch的作用,表示當(dāng)前遷移朝代為數(shù)組長度X),低16位表示有幾個線程正在做遷移,剛開始為2,接下來自增1,線程遷移完會進(jìn)行減1操作,也就是如果低十六位為2,代表有一個線程正在遷移,如果為3,代表2個線程正在遷移以此類推…

        只要數(shù)組長度足夠長,就可以同時容納足夠多的線程來一起擴(kuò)容,最大化并行任務(wù),提高性能。

        11、在什么情況下會進(jìn)行擴(kuò)容操作?

        • 在put值時,發(fā)現(xiàn)Node為占位Node(fwd)時,會協(xié)助擴(kuò)容

        • 在新增節(jié)點(diǎn)后,檢測到鏈表長度大于8時

          final?V?putVal(K?key,?V?value,?boolean?onlyIfAbsent)?{
          ??...
          ?if?(binCount?!=?0)?{
          ????//TREEIFY_THRESHOLD=8,當(dāng)鏈表長度大于8時
          ???if?(binCount?>=?TREEIFY_THRESHOLD)
          ??????//調(diào)用treeifyBin方法
          ?????treeifyBin(tab,?i);
          ???if?(oldVal?!=?null)
          ?????return?oldVal;
          ???break;
          ?}
          ??...
          }

          treeifyBin方法會將鏈表轉(zhuǎn)換為紅黑樹,增加查找效率,但在這之前,會檢查數(shù)組長度,若小于64,則會優(yōu)先做擴(kuò)容操作:

          private?final?void?treeifyBin(Node[]?tab,?int?index)?{
          ??Node?b;?int?n,?sc;
          ??if?(tab?!=?null)?{
          ????//MIN_TREEIFY_CAPACITY=64
          ????//若數(shù)組長度小于64,則先擴(kuò)容
          ????if?((n?=?tab.length)???????//擴(kuò)容
          ??????tryPresize(n?<1);
          ????else?if?((b?=?tabAt(tab,?index))?!=?null?&&?b.hash?>=?0)?{
          ??????synchronized?(b)?{
          ????????//...轉(zhuǎn)換為紅黑樹的操作
          ??????}
          ????}
          ??}
          }
        • 在每次新增節(jié)點(diǎn)之后,都會調(diào)用addCount方法,檢測Node數(shù)組大小是否達(dá)到閾值:

          final?V?putVal(K?key,?V?value,?boolean?onlyIfAbsent)?{
          ??...
          ????//在下面一節(jié)會講到,此方法統(tǒng)計(jì)容器元素?cái)?shù)量
          ????addCount(1L,?binCount);
          ??return?null;
          }
          private?final?void?addCount(long?x,?int?check)?{
          ??CounterCell[]?as;?long?b,?s;
          ??if?((as?=?counterCells)?!=?null?||
          ??????!U.compareAndSwapLong(this,?BASECOUNT,?b?=?baseCount,?s?=?b?+?x))?{
          ????//統(tǒng)計(jì)元素個數(shù)的操作...
          ??}
          ??if?(check?>=?0)?{
          ????Node[]?tab,?nt;?int?n,?sc;
          ????//元素個數(shù)達(dá)到閾值,進(jìn)行擴(kuò)容
          ????while?(s?>=?(long)(sc?=?sizeCtl)?&&?(tab?=?table)?!=?null?&&
          ???????????(n?=?tab.length)???????int?rs?=?resizeStamp(n);
          ??????//發(fā)現(xiàn)sizeCtl為負(fù)數(shù),證明有線程正在遷移
          ??????if?(sc?0)?{
          ????????if?((sc?>>>?RESIZE_STAMP_SHIFT)?!=?rs?||?sc?==?rs?+?1?||
          ????????????sc?==?rs?+?MAX_RESIZERS?||?(nt?=?nextTable)?==?null?||
          ????????????transferIndex?<=?0)
          ??????????break;
          ????????if?(U.compareAndSwapInt(this,?SIZECTL,?sc,?sc?+?1))
          ??????????transfer(tab,?nt);
          ??????}
          ??????//不為負(fù)數(shù),則為第一個遷移的線程
          ??????else?if?(U.compareAndSwapInt(this,?SIZECTL,?sc,
          ???????????????????????????????????(rs?<2))
          ????????transfer(tab,?null);
          ??????s?=?sumCount();
          ????}
          ??}
          }

        12、總結(jié)

        ConcurrentHashMap運(yùn)用各類CAS操作,將擴(kuò)容操作的并發(fā)性能實(shí)現(xiàn)最大化,在擴(kuò)容過程中,就算有線程調(diào)用get查詢方法,也可以安全的查詢數(shù)據(jù),若有線程進(jìn)行put操作,還會協(xié)助擴(kuò)容,利用sizeCtl標(biāo)記位和各種volatile變量進(jìn)行CAS操作達(dá)到多線程之間的通信、協(xié)助,在遷移過程中只鎖一個Node節(jié)點(diǎn),即保證了線程安全,又提高了并發(fā)性能。

        13、統(tǒng)計(jì)容器大小的線程安全

        ConcurrentHashMap在每次put操作之后都會調(diào)用addCount方法,此方法用于統(tǒng)計(jì)容器大小且檢測容器大小是否達(dá)到閾值,若達(dá)到閾值需要進(jìn)行擴(kuò)容操作,這在上面也是有提到的。這一節(jié)重點(diǎn)討論容器大小的統(tǒng)計(jì)是如何做到線程安全且并發(fā)性能不低的。

        大部分的單機(jī)數(shù)據(jù)查詢優(yōu)化方案都會降低并發(fā)性能,就像緩存的存儲,在多線程環(huán)境下將有并發(fā)問題,所以會產(chǎn)生并行或者一系列并發(fā)沖突鎖競爭的問題,降低了并發(fā)性能。類似的,熱點(diǎn)數(shù)據(jù)也有這樣的問題,在多線程并發(fā)的過程中,熱點(diǎn)數(shù)據(jù)(頻繁被訪問的變量)是在每一個線程中幾乎或多或少都會訪問到的數(shù)據(jù),這將增加程序中的串行部分,回憶一下開頭所描述的,程序中的串行部分將影響并發(fā)的可伸縮性,使并發(fā)性能下降,這通常會成為并發(fā)程序性能的瓶頸。而在ConcurrentHashMap中,如何快速的統(tǒng)計(jì)容器大小更是一個很重要的議題,因?yàn)槿萜鲀?nèi)部需要依靠容器大小來考慮是否需要擴(kuò)容,而在客戶端而言需要調(diào)用此方法來知道容器有多少個元素,如果處理不好這種熱點(diǎn)數(shù)據(jù),并發(fā)性能將因?yàn)檫@個短板整體性能下降。

        試想一下,如果是你,你會如何設(shè)計(jì)這種熱點(diǎn)數(shù)據(jù)?是加鎖,還是進(jìn)行CAS操作?進(jìn)入ConcurrentHashMap中,看看大師是如何巧妙的運(yùn)用了并發(fā)技巧,提高熱點(diǎn)數(shù)據(jù)的并發(fā)性能。

        先用圖的方式來看看大致的實(shí)現(xiàn)思路:

        @sun.misc.Contended?static?final?class?CounterCell?{
        ??volatile?long?value;
        ??CounterCell(long?x)?{?value?=?x;?}
        }

        這是一個粗略的實(shí)現(xiàn),在設(shè)計(jì)中,使用了分而治之的思想,將每一個計(jì)數(shù)都分散到各個countCell對象里面(下面稱之為桶),使競爭最小化,又使用了CAS操作,就算有競爭,也可以對失敗了的線程進(jìn)行其他的處理。樂觀鎖的實(shí)現(xiàn)方式與悲觀鎖不同之處就在于樂觀鎖可以對競爭失敗了的線程進(jìn)行其他策略的處理,而悲觀鎖只能等待鎖釋放,所以這里使用CAS操作對競爭失敗的線程做了其他處理,很巧妙的運(yùn)用了CAS樂觀鎖。

        下面看看具體的代碼實(shí)現(xiàn)吧:

        //計(jì)數(shù),并檢查長度是否達(dá)到閾值
        private?final?void?addCount(long?x,?int?check)?{
        ??//計(jì)數(shù)桶
        ??CounterCell[]?as;?long?b,?s;
        ??//如果counterCells不為null,則代表已經(jīng)初始化了,直接進(jìn)入if語句塊
        ??//若競爭不嚴(yán)重,counterCells有可能還未初始化,為null,先嘗試CAS操作遞增baseCount值
        ??if?((as?=?counterCells)?!=?null?||
        ??????!U.compareAndSwapLong(this,?BASECOUNT,?b?=?baseCount,?s?=?b?+?x))?{
        ????//進(jìn)入此語句塊有兩種可能
        ????//1.counterCells被初始化完成了,不為null
        ????//2.CAS操作遞增baseCount值失敗了,說明有競爭
        ????CounterCell?a;?long?v;?int?m;
        ????//標(biāo)志是否存在競爭
        ????boolean?uncontended?=?true;
        ????//1.先判斷計(jì)數(shù)桶是否還沒初始化,則as=null,進(jìn)入語句塊
        ????//2.判斷計(jì)數(shù)桶長度是否為空或,若是進(jìn)入語句塊
        ????//3.這里做了一個線程變量隨機(jī)數(shù),與上桶大小-1,若桶的這個位置為空,進(jìn)入語句塊
        ????//4.到這里說明桶已經(jīng)初始化了,且隨機(jī)的這個位置不為空,嘗試CAS操作使桶加1,失敗進(jìn)入語句塊
        ????if?(as?==?null?||?(m?=?as.length?-?1)?0?||
        ????????(a?=?as[ThreadLocalRandom.getProbe()?&?m])?==?null?||
        ????????!(uncontended?=
        ??????????U.compareAndSwapLong(a,?CELLVALUE,?v?=?a.value,?v?+?x)))?{
        ??????fullAddCount(x,?uncontended);
        ??????return;
        ????}
        ????if?(check?<=?1)
        ??????return;
        ????//統(tǒng)計(jì)容器大小
        ????s?=?sumCount();
        ??}
        ??...
        }

        14、假設(shè)當(dāng)前線程為第一個put的線程

        先假設(shè)當(dāng)前Map還未被put數(shù)據(jù),則addCount一定沒有被調(diào)用過,當(dāng)前線程第一個調(diào)用addCount方法,則此時countCell一定沒有被初始化,為null,則進(jìn)行如下判斷:

        if?((as?=?counterCells)?!=?null?||
        ??????!U.compareAndSwapLong(this,?BASECOUNT,?b?=?baseCount,?s?=?b?+?x))?

        這里的if判斷一定會走第二個判斷,先CAS增加變量baseCount的值:

        private?transient?volatile?long?baseCount;

        這個值有什么用呢?我們看看統(tǒng)計(jì)容器大小的方法sumCount:

        final?long?sumCount()?{
        ??//獲取計(jì)數(shù)桶
        ??CounterCell[]?as?=?counterCells;?CounterCell?a;
        ??//獲取baseCount,賦值給sum總數(shù)
        ??long?sum?=?baseCount;
        ??//若計(jì)數(shù)桶不為空,統(tǒng)計(jì)計(jì)數(shù)桶內(nèi)的值
        ??if?(as?!=?null)?{
        ????for?(int?i?=?0;?i???????//遍歷計(jì)數(shù)桶,將value值相加
        ??????if?((a?=?as[i])?!=?null)
        ????????sum?+=?a.value;
        ????}
        ??}
        ??return?sum;
        }

        這個方法的大體思路與我們開頭那張圖差不多,容器的大小其實(shí)是分為兩部分,開頭只說了計(jì)數(shù)桶的那部分,其實(shí)還有一個baseCount,在線程沒有競爭的情況下的統(tǒng)計(jì)值,換句話說,在增加容量的時候其實(shí)是先去做CAS遞增baseCount的。

        由此可見,統(tǒng)計(jì)容器大小其實(shí)是用了兩種思路:

        1. CAS方式直接遞增:在線程競爭不大的時候,直接使用CAS操作遞增baseCount值即可,這里說的競爭不大指的是CAS操作不會失敗的情況
        2. 分而治之桶計(jì)數(shù):若出現(xiàn)了CAS操作失敗的情況,則證明此時有線程競爭了,計(jì)數(shù)方式從CAS方式轉(zhuǎn)變?yōu)榉侄沃耐坝?jì)數(shù)方式

        15、出現(xiàn)了線程競爭導(dǎo)致CAS失敗

        此時出現(xiàn)了競爭,則不會再用CAS方式來計(jì)數(shù)了,直接使用桶方式,從上面的addCount方法可以看出來,此時的countCell是為空的,最終一定會進(jìn)入fullAddCount方法來進(jìn)行初始化桶:

        ???private?final?void?fullAddCount(long?x,?boolean?wasUncontended)?{
        ????????int?h;
        ????????if?((h?=?ThreadLocalRandom.getProbe())?==?0)?{
        ????????????ThreadLocalRandom.localInit();??????//?force?initialization
        ????????????h?=?ThreadLocalRandom.getProbe();
        ????????????wasUncontended?=?true;
        ????????}
        ????????boolean?collide?=?false;????????????????//?True?if?last?slot?nonempty
        ????????for?(;;)?{
        ????????????CounterCell[]?as;?CounterCell?a;?int?n;?long?v;
        ????????????...
        ????????????//如果計(jì)數(shù)桶!=null,證明已經(jīng)初始化,此時不走此語句塊
        ????????????if?((as?=?counterCells)?!=?null?&&?(n?=?as.length)?>?0)?{
        ??????????????...
        ????????????}
        ????????????//進(jìn)入此語句塊進(jìn)行計(jì)數(shù)桶的初始化
        ????????????//CAS設(shè)置cellsBusy=1,表示現(xiàn)在計(jì)數(shù)桶Busy中...
        ????????????else?if?(cellsBusy?==?0?&&?counterCells?==?as?&&
        ?????????????????????U.compareAndSwapInt(this,?CELLSBUSY,?0,?1))?{
        ????????????????//若有線程同時初始化計(jì)數(shù)桶,由于CAS操作只有一個線程進(jìn)入這里
        ????????????????boolean?init?=?false;
        ????????????????try?{???????????????????????????//?Initialize?table
        ????????????????????//再次確認(rèn)計(jì)數(shù)桶為空
        ????????????????????if?(counterCells?==?as)?{
        ????????????????????????//初始化一個長度為2的計(jì)數(shù)桶
        ????????????????????????CounterCell[]?rs?=?new?CounterCell[2];
        ????????????????????????//h為一個隨機(jī)數(shù),與上1則代表結(jié)果為0、1中隨機(jī)的一個
        ????????????????????????//也就是在0、1下標(biāo)中隨便選一個計(jì)數(shù)桶,x=1,放入1的值代表增加1個容量
        ????????????????????????rs[h?&?1]?=?new?CounterCell(x);
        ????????????????????????//將初始化好的計(jì)數(shù)桶賦值給ConcurrentHashMap
        ????????????????????????counterCells?=?rs;
        ????????????????????????init?=?true;
        ????????????????????}
        ????????????????}?finally?{
        ????????????????????//最后將busy標(biāo)識設(shè)置為0,表示不busy了
        ????????????????????cellsBusy?=?0;
        ????????????????}
        ????????????????if?(init)
        ????????????????????break;
        ????????????}
        ????????????//若有線程同時來初始化計(jì)數(shù)桶,則沒有搶到busy資格的線程就先來CAS遞增baseCount
        ????????????else?if?(U.compareAndSwapLong(this,?BASECOUNT,?v?=?baseCount,?v?+?x))
        ????????????????break;??????????????????????????//?Fall?back?on?using?base
        ????????}
        ????}

        到這里就完成了計(jì)數(shù)桶的初始化工作,在之后的計(jì)數(shù)都將會使用計(jì)數(shù)桶方式來統(tǒng)計(jì)總數(shù)

        16、計(jì)數(shù)桶擴(kuò)容

        從上面的分析中我們知道,計(jì)數(shù)桶初始化之后長度為2,在競爭大的時候肯定是不夠用的,所以一定有計(jì)數(shù)桶的擴(kuò)容操作,所以現(xiàn)在就有兩個問題了:

        1. 什么條件下會進(jìn)行計(jì)數(shù)桶的擴(kuò)容?
        2. 擴(kuò)容操作是怎么樣的?

        假設(shè)此時是用計(jì)數(shù)桶方式進(jìn)行計(jì)數(shù):

        private?final?void?addCount(long?x,?int?check)?{
        ??CounterCell[]?as;?long?b,?s;
        ??if?((as?=?counterCells)?!=?null?||
        ??????!U.compareAndSwapLong(this,?BASECOUNT,?b?=?baseCount,?s?=?b?+?x))?{
        ????CounterCell?a;?long?v;?int?m;
        ????boolean?uncontended?=?true;
        ????//此時顯然會在計(jì)數(shù)桶數(shù)組中隨機(jī)選一個計(jì)數(shù)桶
        ????//然后使用CAS操作將此計(jì)數(shù)桶中的value+1
        ????if?(as?==?null?||?(m?=?as.length?-?1)?0?||
        ????????(a?=?as[ThreadLocalRandom.getProbe()?&?m])?==?null?||
        ????????!(uncontended?=
        ??????????U.compareAndSwapLong(a,?CELLVALUE,?v?=?a.value,?v?+?x)))?{
        ??????//若CAS操作失敗,證明有競爭,進(jìn)入fullAddCount方法
        ??????fullAddCount(x,?uncontended);
        ??????return;
        ????}
        ????if?(check?<=?1)
        ??????return;
        ????s?=?sumCount();
        ??}
        ??...
        }

        進(jìn)入fullAddCount方法:

        private?final?void?fullAddCount(long?x,?boolean?wasUncontended)?{
        ??int?h;
        ??if?((h?=?ThreadLocalRandom.getProbe())?==?0)?{
        ????ThreadLocalRandom.localInit();??????//?force?initialization
        ????h?=?ThreadLocalRandom.getProbe();
        ????wasUncontended?=?true;
        ??}
        ??boolean?collide?=?false;????????????????//?True?if?last?slot?nonempty
        ??for?(;;)?{
        ????CounterCell[]?as;?CounterCell?a;?int?n;?long?v;
        ????//計(jì)數(shù)桶初始化好了,一定是走這個if語句塊
        ????if?((as?=?counterCells)?!=?null?&&?(n?=?as.length)?>?0)?{
        ??????//從計(jì)數(shù)桶數(shù)組隨機(jī)選一個計(jì)數(shù)桶,若為null表示該桶位還沒線程遞增過
        ??????if?((a?=?as[(n?-?1)?&?h])?==?null)?{
        ????????//查看計(jì)數(shù)桶busy狀態(tài)是否被標(biāo)識
        ????????if?(cellsBusy?==?0)?{????????????//?Try?to?attach?new?Cell
        ??????????//若不busy,直接new一個計(jì)數(shù)桶
        ??????????CounterCell?r?=?new?CounterCell(x);?//?Optimistic?create
        ??????????//CAS操作,標(biāo)示計(jì)數(shù)桶busy中
        ??????????if?(cellsBusy?==?0?&&
        ??????????????U.compareAndSwapInt(this,?CELLSBUSY,?0,?1))?{
        ????????????boolean?created?=?false;
        ????????????try?{???????????????//?Recheck?under?lock
        ??????????????CounterCell[]?rs;?int?m,?j;
        ??????????????//在鎖下再檢查一次計(jì)數(shù)桶為null
        ??????????????if?((rs?=?counterCells)?!=?null?&&
        ??????????????????(m?=?rs.length)?>?0?&&
        ??????????????????rs[j?=?(m?-?1)?&?h]?==?null)?{
        ????????????????//將剛剛創(chuàng)建的計(jì)數(shù)桶賦值給對應(yīng)位置
        ????????????????rs[j]?=?r;
        ????????????????created?=?true;
        ??????????????}
        ????????????}?finally?{
        ??????????????//標(biāo)示不busy了
        ??????????????cellsBusy?=?0;
        ????????????}
        ????????????if?(created)
        ??????????????break;
        ????????????continue;???????????//?Slot?is?now?non-empty
        ??????????}
        ????????}
        ????????collide?=?false;
        ??????}
        ??????else?if?(!wasUncontended)???????//?CAS?already?known?to?fail
        ????????wasUncontended?=?true;??????//?Continue?after?rehash
        ??????//走到這里代表計(jì)數(shù)桶不為null,嘗試遞增計(jì)數(shù)桶
        ??????else?if?(U.compareAndSwapLong(a,?CELLVALUE,?v?=?a.value,?v?+?x))
        ????????break;
        ??????else?if?(counterCells?!=?as?||?n?>=?NCPU)
        ????????collide?=?false;????????????//?At?max?size?or?stale
        ??????//若CAS操作失敗了,到了這里,會先進(jìn)入一次,然后再走一次剛剛的for循環(huán)
        ??????//若是第二次for循環(huán),collide=true,則不會走進(jìn)去
        ??????else?if?(!collide)
        ????????collide?=?true;
        ??????//計(jì)數(shù)桶擴(kuò)容,一個線程若走了兩次for循環(huán),也就是進(jìn)行了多次CAS操作遞增計(jì)數(shù)桶失敗了
        ??????//則進(jìn)行計(jì)數(shù)桶擴(kuò)容,CAS標(biāo)示計(jì)數(shù)桶busy中
        ??????else?if?(cellsBusy?==?0?&&
        ???????????????U.compareAndSwapInt(this,?CELLSBUSY,?0,?1))?{
        ????????try?{
        ??????????//確認(rèn)計(jì)數(shù)桶還是同一個
        ??????????if?(counterCells?==?as)?{//?Expand?table?unless?stale
        ????????????//將長度擴(kuò)大到2倍
        ????????????CounterCell[]?rs?=?new?CounterCell[n?<1];
        ????????????//遍歷舊計(jì)數(shù)桶,將引用直接搬過來
        ????????????for?(int?i?=?0;?i???????????????rs[i]?=?as[i];
        ????????????//賦值
        ????????????counterCells?=?rs;
        ??????????}
        ????????}?finally?{
        ??????????//取消busy狀態(tài)
        ??????????cellsBusy?=?0;
        ????????}
        ????????collide?=?false;
        ????????continue;???????????????????//?Retry?with?expanded?table
        ??????}
        ??????h?=?ThreadLocalRandom.advanceProbe(h);
        ????}
        ????else?if?(cellsBusy?==?0?&&?counterCells?==?as?&&
        ?????????????U.compareAndSwapInt(this,?CELLSBUSY,?0,?1))?{
        ??????//初始化計(jì)數(shù)桶...
        ????}
        ????else?if?(U.compareAndSwapLong(this,?BASECOUNT,?v?=?baseCount,?v?+?x))
        ??????break;??????????????????????????//?Fall?back?on?using?base
        ??}
        }

        看到這里,想必已經(jīng)可以解決上面兩個問題了:

        1. 什么條件下會進(jìn)行計(jì)數(shù)桶的擴(kuò)容?

          答:在CAS操作遞增計(jì)數(shù)桶失敗了3次之后,會進(jìn)行擴(kuò)容計(jì)數(shù)桶操作,注意此時同時進(jìn)行了兩次隨機(jī)定位計(jì)數(shù)桶來進(jìn)行CAS遞增的,所以此時可以保證大概率是因?yàn)橛?jì)數(shù)桶不夠用了,才會進(jìn)行計(jì)數(shù)桶擴(kuò)容

        2. 擴(kuò)容操作是怎么樣的?

          答:計(jì)數(shù)桶長度增加到兩倍長度,數(shù)據(jù)直接遍歷遷移過來,由于計(jì)數(shù)桶不像HashMap數(shù)據(jù)結(jié)構(gòu)那么復(fù)雜,有hash算法的影響,加上計(jì)數(shù)桶只是存放一個long類型的計(jì)數(shù)值而已,所以直接賦值引用即可。

        17、總結(jié)

        個人感覺,統(tǒng)計(jì)容器大小的線程安全與擴(kuò)容ConcurrentHashMap這兩個算得上ConcurrentHashMap中最聰明的兩個并發(fā)設(shè)計(jì)了,閱讀此源碼的我看到這兩個操作的設(shè)計(jì),都忍不住拍手叫絕,我想,這或許也是一個看源碼的樂趣吧,站在巨人的肩膀看巨人的思想。

        總結(jié)一下計(jì)數(shù)中用到的并發(fā)技巧:

        1. 利用CAS遞增baseCount值來感知是否存在線程競爭,若競爭不大直接CAS遞增baseCount值即可,性能與直接baseCount++差別不大
        2. 若存在線程競爭,則初始化計(jì)數(shù)桶,若此時初始化計(jì)數(shù)桶的過程中也存在競爭,多個線程同時初始化計(jì)數(shù)桶,則沒有搶到初始化資格的線程直接嘗試CAS遞增baseCount值的方式完成計(jì)數(shù),最大化利用了線程的并行。此時使用計(jì)數(shù)桶計(jì)數(shù),分而治之的方式來計(jì)數(shù),此時兩個計(jì)數(shù)桶最大可提供兩個線程同時計(jì)數(shù),同時使用CAS操作來感知線程競爭,若兩個桶情況下CAS操作還是頻繁失?。ㄊ?次),則直接擴(kuò)容計(jì)數(shù)桶,變?yōu)?個計(jì)數(shù)桶,支持最大同時4個線程并發(fā)計(jì)數(shù),以此類推…同時使用位運(yùn)算和隨機(jī)數(shù)的方式"負(fù)載均衡"一樣的將線程計(jì)數(shù)請求接近均勻的落在各個計(jì)數(shù)桶中。

        18、get操作的線程安全

        對于get操作,其實(shí)沒有線程安全的問題,只有可見性的問題,只需要確保get的數(shù)據(jù)是線程之間可見的即可:

        public?V?get(Object?key)?{
        ??Node[]?tab;?Node?e,?p;?int?n,?eh;?K?ek;
        ??int?h?=?spread(key.hashCode());
        ??//此過程與HashMap的get操作無異,不多贅述
        ??if?((tab?=?table)?!=?null?&&?(n?=?tab.length)?>?0?&&
        ??????(e?=?tabAt(tab,?(n?-?1)?&?h))?!=?null)?{
        ????if?((eh?=?e.hash)?==?h)?{
        ??????if?((ek?=?e.key)?==?key?||?(ek?!=?null?&&?key.equals(ek)))
        ????????return?e.val;
        ????}
        ????//當(dāng)hash<0,有可能是在遷移,使用fwd占位Node去查找新table中的數(shù)據(jù)
        ????else?if?(eh?0)
        ??????return?(p?=?e.find(h,?key))?!=?null???p.val?:?null;
        ????while?((e?=?e.next)?!=?null)?{
        ??????if?(e.hash?==?h?&&
        ??????????((ek?=?e.key)?==?key?||?(ek?!=?null?&&?key.equals(ek))))
        ????????return?e.val;
        ????}
        ??}
        ??return?null;
        }

        在get操作中除了增加了遷移的判斷以外,基本與HashMap的get操作無異,這里不多贅述,值得一提的是這里使用了tabAt方法Unsafe類volatile的方式去獲取Node數(shù)組中的Node,保證獲得到的Node是最新的

        static?final??Node?tabAt(Node[]?tab,?int?i)?{
        ??return?(Node)U.getObjectVolatile(tab,?((long)i?<}

        19、JDK1.7與1.8的不同實(shí)現(xiàn)

        JDK1.7的ConcurrentHashMap底層數(shù)據(jù)結(jié)構(gòu):其中1.7的實(shí)現(xiàn)也同樣采用了分段鎖的技術(shù),只不過多個一個segment,一個segment里對應(yīng)一個小HashMap,其中segment繼承了ReentrantLock,充當(dāng)了鎖的角色,一把鎖鎖一個小HashMap(相當(dāng)于多個Node),從1.8的實(shí)現(xiàn)來看, 鎖的粒度從多個Node級別又減小到一個Node級別,再度減小鎖競爭,減小程序同步的部分。

        20、總結(jié)

        不得不說,大師將CAS操作運(yùn)用的淋漓盡致,相信理解了以上源碼的讀者也可以學(xué)習(xí)到大師所運(yùn)用的并發(fā)技巧,不僅僅是在ConcurrentHashMap中,其實(shí)在大部分JUC的源碼中很多并發(fā)思想很值得我們?nèi)ラ喿x、學(xué)習(xí)。

        干貨分享

        最近將個人學(xué)習(xí)筆記整理成冊,使用PDF分享。關(guān)注我,回復(fù)如下代碼,即可獲得百度盤地址,無套路領(lǐng)?。?/p>

        ?001:《Java并發(fā)與高并發(fā)解決方案》學(xué)習(xí)筆記;?002:《深入JVM內(nèi)核——原理、診斷與優(yōu)化》學(xué)習(xí)筆記;?003:《Java面試寶典》?004:《Docker開源書》?005:《Kubernetes開源書》?006:《DDD速成(領(lǐng)域驅(qū)動設(shè)計(jì)速成)》?007:全部?008:加技術(shù)群討論

        加個關(guān)注不迷路

        喜歡就點(diǎn)個"在看"唄^_^

        瀏覽 54
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評論
        圖片
        表情
        推薦
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            日韩爱爱片 | 看免费一级片 | 黄片视频色 | 男女拍拍免费视频 | 啊日出水了用力乖乖视频 | 天天艹天天艹天天艹 | 国产卡一卡二在线观看 | 怡春院中文字幕 | 高清mv妈妈我想你看完泪目了 | 日韩人妻一区二区三区 |