1. 高性能無鎖并發(fā)框架 Disruptor,太強(qiáng)了!

        共 4499字,需瀏覽 9分鐘

         ·

        2020-09-21 11:00

        Java技術(shù)棧

        www.javastack.cn

        關(guān)注閱讀更多優(yōu)質(zhì)文章



        Disruptor是一個開源框架,研發(fā)的初衷是為了解決高并發(fā)下隊列鎖的問題,最早由LMAX提出并使用,能夠在無鎖的情況下實現(xiàn)隊列的并發(fā)操作,并號稱能夠在一個線程里每秒處理6百萬筆訂單

        官網(wǎng):http://lmax-exchange.github.io/disruptor/

        目前,包括Apache Storm、Camel、Log4j2在內(nèi)的很多知名項目都應(yīng)用了Disruptor以獲取高性能

        為什么會產(chǎn)生Disruptor框架

        「目前Java內(nèi)置隊列保證線程安全的方式:」

        ArrayBlockingQueue:基于數(shù)組形式的隊列,通過加鎖的方式,來保證多線程情況下數(shù)據(jù)的安全;

        LinkedBlockingQueue:基于鏈表形式的隊列,也通過加鎖的方式,來保證多線程情況下數(shù)據(jù)的安全;

        ConcurrentLinkedQueue:基于鏈表形式的隊列,通過CAS的方式

        我們知道,在編程過程中,加鎖通常會嚴(yán)重地影響性能,所以盡量用無鎖方式,就產(chǎn)生了Disruptor這種無鎖高并發(fā)框架

        基本概念

        參考地址:https://github.com/LMAX-Exchange/disruptor/wiki/Introduction#core-concepts

        RingBuffer——Disruptor底層數(shù)據(jù)結(jié)構(gòu)實現(xiàn),核心類,是線程間交換數(shù)據(jù)的中轉(zhuǎn)地;

        Sequencer——序號管理器,生產(chǎn)同步的實現(xiàn)者,負(fù)責(zé)消費者/生產(chǎn)者各自序號、序號柵欄的管理和協(xié)調(diào),Sequencer有單生產(chǎn)者,多生產(chǎn)者兩種不同的模式,里面實現(xiàn)了各種同步的算法;

        Sequence——序號,聲明一個序號,用于跟蹤ringbuffer中任務(wù)的變化和消費者的消費情況,disruptor里面大部分的并發(fā)代碼都是通過對Sequence的值同步修改實現(xiàn)的,而非鎖,這是disruptor高性能的一個主要原因;

        SequenceBarrier——序號柵欄,管理和協(xié)調(diào)生產(chǎn)者的游標(biāo)序號和各個消費者的序號,確保生產(chǎn)者不會覆蓋消費者未來得及處理的消息,確保存在依賴的消費者之間能夠按照正確的順序處理

        EventProcessor——事件處理器,監(jiān)聽RingBuffer的事件,并消費可用事件,從RingBuffer讀取的事件會交由實際的生產(chǎn)者實現(xiàn)類來消費;它會一直偵聽下一個可用的序號,直到該序號對應(yīng)的事件已經(jīng)準(zhǔn)備好。

        EventHandler——業(yè)務(wù)處理器,是實際消費者的接口,完成具體的業(yè)務(wù)邏輯實現(xiàn),第三方實現(xiàn)該接口;代表著消費者。

        Producer——生產(chǎn)者接口,第三方線程充當(dāng)該角色,producer向RingBuffer寫入事件。

        Wait Strategy:Wait Strategy決定了一個消費者怎么等待生產(chǎn)者將事件(Event)放入Disruptor中。

        等待策略

        源碼地址:https://github.com/LMAX-Exchange/disruptor/blob/master/src/main/java/com/lmax/disruptor/WaitStrategy.java

        「BlockingWaitStrategy」

        Disruptor的默認(rèn)策略是BlockingWaitStrategy。在BlockingWaitStrategy內(nèi)部是使用鎖和condition來控制線程的喚醒。BlockingWaitStrategy是最低效的策略,但其對CPU的消耗最小并且在各種不同部署環(huán)境中能提供更加一致的性能表現(xiàn)。

        「SleepingWaitStrategy」

        SleepingWaitStrategy 的性能表現(xiàn)跟 BlockingWaitStrategy 差不多,對 CPU 的消耗也類似,但其對生產(chǎn)者線程的影響最小,通過使用LockSupport.parkNanos(1)來實現(xiàn)循環(huán)等待。

        「YieldingWaitStrategy」

        YieldingWaitStrategy是可以使用在低延遲系統(tǒng)的策略之一。YieldingWaitStrategy將自旋以等待序列增加到適當(dāng)?shù)闹?。在循環(huán)體內(nèi),將調(diào)用Thread.yield()以允許其他排隊的線程運行。在要求極高性能且事件處理線數(shù)小于 CPU 邏輯核心數(shù)的場景中,推薦使用此策略;例如,CPU開啟超線程的特性。

        「BusySpinWaitStrategy」

        性能最好,適合用于低延遲的系統(tǒng)。在要求極高性能且事件處理線程數(shù)小于CPU邏輯核心數(shù)的場景中,推薦使用此策略;例如,CPU開啟超線程的特性。

        「PhasedBackoffWaitStrategy」

        自旋 + yield + 自定義策略,CPU資源緊缺,吞吐量和延遲并不重要的場景。

        使用舉例

        參考地址:https://github.com/LMAX-Exchange/disruptor/wiki/Getting-Started

        <dependency>
        ??????<groupId>com.lmaxgroupId>
        ??????<artifactId>disruptorartifactId>
        ??????<version>3.3.4version>
        ???dependency>
        //定義事件event ?通過Disruptor 進(jìn)行交換的數(shù)據(jù)類型。
        public?class?LongEvent?{

        ????private?Long?value;

        ????public?Long?getValue()?{
        ????????return?value;
        ????}

        ????public?void?setValue(Long?value)?{
        ????????this.value?=?value;
        ????}

        }
        public?class?LongEventFactory?implements?EventFactory<LongEvent>?{
        ????public?LongEvent?newInstance()?{
        ????????return?new?LongEvent();
        ????}
        }
        //定義事件消費者
        public?class?LongEventHandler?implements?EventHandler<LongEvent>??{
        ????public?void?onEvent(LongEvent?event,?long?sequence,?boolean?endOfBatch)?throws?Exception?{
        ?????????System.out.println("消費者:"+event.getValue());
        ????}
        }
        //定義生產(chǎn)者
        public?class?LongEventProducer?{
        ????public?final?RingBuffer?ringBuffer;
        ????public?LongEventProducer(RingBuffer?ringBuffer)?{
        ????????this.ringBuffer?=?ringBuffer;
        ????}
        ????public?void?onData(ByteBuffer?byteBuffer)?{
        ????????//?1.ringBuffer?事件隊列?下一個槽
        ????????long?sequence?=?ringBuffer.next();
        ????????Long?data?=?null;
        ????????try?{
        ????????????//2.取出空的事件隊列
        ????????????LongEvent?longEvent?=?ringBuffer.get(sequence);
        ????????????data?=?byteBuffer.getLong(0);
        ????????????//3.獲取事件隊列傳遞的數(shù)據(jù)
        ????????????longEvent.setValue(data);
        ????????????try?{
        ????????????????Thread.sleep(10);
        ????????????}?catch?(InterruptedException?e)?{
        ????????????????//?TODO?Auto-generated?catch?block
        ????????????????e.printStackTrace();
        ????????????}
        ????????}?finally?{
        ????????????System.out.println("生產(chǎn)這準(zhǔn)備發(fā)送數(shù)據(jù)");
        ????????????//4.發(fā)布事件
        ????????????ringBuffer.publish(sequence);
        ????????}
        ????}
        }
        public?class?DisruptorMain?{
        ????public?static?void?main(String[]?args)?{
        ????????//?1.創(chuàng)建一個可緩存的線程?提供線程來出發(fā)Consumer?的事件處理
        ????????ExecutorService?executor?=?Executors.newCachedThreadPool();
        ????????//?2.創(chuàng)建工廠
        ????????EventFactory?eventFactory?=?new?LongEventFactory();
        ????????//?3.創(chuàng)建ringBuffer?大小
        ????????int?ringBufferSize?=?1024?*?1024;?//?ringBufferSize大小一定要是2的N次方
        ????????//?4.創(chuàng)建Disruptor
        ????????Disruptor?disruptor?=?new?Disruptor(eventFactory,?ringBufferSize,?executor,
        ????????????????ProducerType.SINGLE,?new?YieldingWaitStrategy());
        ????????//?5.連接消費端方法
        ????????disruptor.handleEventsWith(new?LongEventHandler());
        ????????//?6.啟動
        ????????disruptor.start();
        ????????//?7.創(chuàng)建RingBuffer容器
        ????????RingBuffer?ringBuffer?=?disruptor.getRingBuffer();
        ????????//?8.創(chuàng)建生產(chǎn)者
        ????????LongEventProducer?producer?=?new?LongEventProducer(ringBuffer);
        ????????//?9.指定緩沖區(qū)大小
        ????????ByteBuffer?byteBuffer?=?ByteBuffer.allocate(8);
        ????????for?(int?i?=?1;?i?<=?100;?i++)?{
        ????????????byteBuffer.putLong(0,?i);
        ????????????producer.onData(byteBuffer);
        ????????}
        ????????//10.關(guān)閉disruptor和executor
        ????????disruptor.shutdown();
        ????????executor.shutdown();
        ????}
        }

        核心設(shè)計原理

        Disruptor通過以下設(shè)計來解決隊列速度慢的問題:

        「環(huán)形數(shù)組結(jié)構(gòu):」

        為了避免垃圾回收,采用數(shù)組而非鏈表。同時,數(shù)組對處理器的緩存機(jī)制更加友好

        ?

        原因:CPU緩存是由很多個緩存行組成的。每個緩存行通常是64字節(jié),并且它有效地引用主內(nèi)存中的一塊兒地址。一個Java的long類型變量是8字節(jié),因此在一個緩存行中可以存8個long類型的變量。CPU每次從主存中拉取數(shù)據(jù)時,會把相鄰的數(shù)據(jù)也存入同一個緩存行。在訪問一個long數(shù)組的時候,如果數(shù)組中的一個值被加載到緩存中,它會自動加載另外7個。因此你能非??斓谋闅v這個數(shù)組。

        ?

        「元素位置定位:」

        數(shù)組長度2^n,通過位運算,加快定位的速度。下標(biāo)采取遞增的形式。不用擔(dān)心index溢出的問題。index是long類型,即使100萬QPS的處理速度,也需要30萬年才能用完。

        「無鎖設(shè)計:」

        每個生產(chǎn)者或者消費者線程,會先申請可以操作的元素在數(shù)組中的位置,申請到之后,直接在該位置寫入或者讀取數(shù)據(jù),整個過程通過原子變量CAS,保證操作的線程安全

        數(shù)據(jù)結(jié)構(gòu)

        框架使用RingBuffer來作為隊列的數(shù)據(jù)結(jié)構(gòu),RingBuffer就是一個可自定義大小的環(huán)形數(shù)組。

        除數(shù)組外還有一個序列號(sequence),用以指向下一個可用的元素,供生產(chǎn)者與消費者使用。

        原理圖如下所示:

        Sequence

        mark:Disruptor通過順序遞增的序號來編號管理通過其進(jìn)行交換的數(shù)據(jù)(事件),對數(shù)據(jù)(事件)的處理過程總是沿著序號逐個遞增處理。

        「數(shù)組+序列號設(shè)計的優(yōu)勢是什么呢?」

        回顧一下HashMap,在知道索引(index)下標(biāo)的情況下,存與取數(shù)組上的元素時間復(fù)雜度只有O(1),而這個index我們可以通過序列號與數(shù)組的長度取模來計算得出,index=sequence % table.length。當(dāng)然也可以用位運算來計算效率更高,此時table.length必須是2的冪次方。

        寫數(shù)據(jù)流程

        單線程寫數(shù)據(jù)的流程:

        1. 申請寫入m個元素;
        2. 若是有m個元素可以入,則返回最大的序列號。這兒主要判斷是否會覆蓋未讀的元素;
        3. 若是返回的正確,則生產(chǎn)者開始寫入元素。

        使用場景

        經(jīng)過測試,Disruptor的的延時和吞吐量都比ArrayBlockingQueue優(yōu)秀很多,所以,當(dāng)你在使用ArrayBlockingQueue出現(xiàn)性能瓶頸的時候,你就可以考慮采用Disruptor的代替。

        參考:https://github.com/LMAX-Exchange/disruptor/wiki/Performance-Results

        當(dāng)然,Disruptor性能高并不是必然的,所以,是否使用還得經(jīng)過測試。

        Disruptor的最常用的場景就是“生產(chǎn)者-消費者”場景,對場景的就是“一個生產(chǎn)者、多個消費者”的場景,并且要求順序處理。

        舉個例子,我們從MySQL的BigLog文件中順序讀取數(shù)據(jù),然后寫入到ElasticSearch(搜索引擎)中。在這種場景下,BigLog要求一個文件一個生產(chǎn)者,那個是一個生產(chǎn)者。而寫入到ElasticSearch,則嚴(yán)格要求順序,否則會出現(xiàn)問題,所以通常意義上的多消費者線程無法解決該問題,如果通過加鎖,則性能大打折扣

        參考:

        https://tech.meituan.com/2016/11/18/disruptor.html

        https://github.com/LMAX-Exchange/disruptor/wiki





        關(guān)注Java技術(shù)??锤喔韶?/strong>



        戳原文獲取面試題資料!
        瀏覽 42
        點贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報
          
          

            1. 国产农村乱婬片A片AAA图片 | 免看一级a一片成人久久最新章节 | 影音先锋乱伦小说 | 欧美黄色大片免费在线观看 | 成年人性生活免费视频 |