高性能無鎖并發(fā)框架 Disruptor,太強(qiáng)了!

Java技術(shù)棧
www.javastack.cn
關(guān)注閱讀更多優(yōu)質(zhì)文章
Disruptor是一個開源框架,研發(fā)的初衷是為了解決高并發(fā)下隊列鎖的問題,最早由LMAX提出并使用,能夠在無鎖的情況下實現(xiàn)隊列的并發(fā)操作,并號稱能夠在一個線程里每秒處理6百萬筆訂單
官網(wǎng):http://lmax-exchange.github.io/disruptor/
目前,包括Apache Storm、Camel、Log4j2在內(nèi)的很多知名項目都應(yīng)用了Disruptor以獲取高性能
為什么會產(chǎn)生Disruptor框架
「目前Java內(nèi)置隊列保證線程安全的方式:」
ArrayBlockingQueue:基于數(shù)組形式的隊列,通過加鎖的方式,來保證多線程情況下數(shù)據(jù)的安全;
LinkedBlockingQueue:基于鏈表形式的隊列,也通過加鎖的方式,來保證多線程情況下數(shù)據(jù)的安全;
ConcurrentLinkedQueue:基于鏈表形式的隊列,通過CAS的方式
我們知道,在編程過程中,加鎖通常會嚴(yán)重地影響性能,所以盡量用無鎖方式,就產(chǎn)生了Disruptor這種無鎖高并發(fā)框架
基本概念
參考地址:https://github.com/LMAX-Exchange/disruptor/wiki/Introduction#core-concepts
RingBuffer——Disruptor底層數(shù)據(jù)結(jié)構(gòu)實現(xiàn),核心類,是線程間交換數(shù)據(jù)的中轉(zhuǎn)地;
Sequencer——序號管理器,生產(chǎn)同步的實現(xiàn)者,負(fù)責(zé)消費者/生產(chǎn)者各自序號、序號柵欄的管理和協(xié)調(diào),Sequencer有單生產(chǎn)者,多生產(chǎn)者兩種不同的模式,里面實現(xiàn)了各種同步的算法;
Sequence——序號,聲明一個序號,用于跟蹤ringbuffer中任務(wù)的變化和消費者的消費情況,disruptor里面大部分的并發(fā)代碼都是通過對Sequence的值同步修改實現(xiàn)的,而非鎖,這是disruptor高性能的一個主要原因;
SequenceBarrier——序號柵欄,管理和協(xié)調(diào)生產(chǎn)者的游標(biāo)序號和各個消費者的序號,確保生產(chǎn)者不會覆蓋消費者未來得及處理的消息,確保存在依賴的消費者之間能夠按照正確的順序處理
EventProcessor——事件處理器,監(jiān)聽RingBuffer的事件,并消費可用事件,從RingBuffer讀取的事件會交由實際的生產(chǎn)者實現(xiàn)類來消費;它會一直偵聽下一個可用的序號,直到該序號對應(yīng)的事件已經(jīng)準(zhǔn)備好。
EventHandler——業(yè)務(wù)處理器,是實際消費者的接口,完成具體的業(yè)務(wù)邏輯實現(xiàn),第三方實現(xiàn)該接口;代表著消費者。
Producer——生產(chǎn)者接口,第三方線程充當(dāng)該角色,producer向RingBuffer寫入事件。
Wait Strategy:Wait Strategy決定了一個消費者怎么等待生產(chǎn)者將事件(Event)放入Disruptor中。

等待策略
源碼地址:https://github.com/LMAX-Exchange/disruptor/blob/master/src/main/java/com/lmax/disruptor/WaitStrategy.java
「BlockingWaitStrategy」
Disruptor的默認(rèn)策略是BlockingWaitStrategy。在BlockingWaitStrategy內(nèi)部是使用鎖和condition來控制線程的喚醒。BlockingWaitStrategy是最低效的策略,但其對CPU的消耗最小并且在各種不同部署環(huán)境中能提供更加一致的性能表現(xiàn)。
「SleepingWaitStrategy」
SleepingWaitStrategy 的性能表現(xiàn)跟 BlockingWaitStrategy 差不多,對 CPU 的消耗也類似,但其對生產(chǎn)者線程的影響最小,通過使用LockSupport.parkNanos(1)來實現(xiàn)循環(huán)等待。
「YieldingWaitStrategy」
YieldingWaitStrategy是可以使用在低延遲系統(tǒng)的策略之一。YieldingWaitStrategy將自旋以等待序列增加到適當(dāng)?shù)闹?。在循環(huán)體內(nèi),將調(diào)用Thread.yield()以允許其他排隊的線程運行。在要求極高性能且事件處理線數(shù)小于 CPU 邏輯核心數(shù)的場景中,推薦使用此策略;例如,CPU開啟超線程的特性。
「BusySpinWaitStrategy」
性能最好,適合用于低延遲的系統(tǒng)。在要求極高性能且事件處理線程數(shù)小于CPU邏輯核心數(shù)的場景中,推薦使用此策略;例如,CPU開啟超線程的特性。
「PhasedBackoffWaitStrategy」
自旋 + yield + 自定義策略,CPU資源緊缺,吞吐量和延遲并不重要的場景。
使用舉例
參考地址:https://github.com/LMAX-Exchange/disruptor/wiki/Getting-Started
<dependency>
??????<groupId>com.lmaxgroupId>
??????<artifactId>disruptorartifactId>
??????<version>3.3.4version>
???dependency>
//定義事件event ?通過Disruptor 進(jìn)行交換的數(shù)據(jù)類型。
public?class?LongEvent?{
????private?Long?value;
????public?Long?getValue()?{
????????return?value;
????}
????public?void?setValue(Long?value)?{
????????this.value?=?value;
????}
}
public?class?LongEventFactory?implements?EventFactory<LongEvent>?{
????public?LongEvent?newInstance()?{
????????return?new?LongEvent();
????}
}
//定義事件消費者
public?class?LongEventHandler?implements?EventHandler<LongEvent>??{
????public?void?onEvent(LongEvent?event,?long?sequence,?boolean?endOfBatch)?throws?Exception?{
?????????System.out.println("消費者:"+event.getValue());
????}
}
//定義生產(chǎn)者
public?class?LongEventProducer?{
????public?final?RingBuffer?ringBuffer;
????public?LongEventProducer(RingBuffer?ringBuffer) ?{
????????this.ringBuffer?=?ringBuffer;
????}
????public?void?onData(ByteBuffer?byteBuffer)?{
????????//?1.ringBuffer?事件隊列?下一個槽
????????long?sequence?=?ringBuffer.next();
????????Long?data?=?null;
????????try?{
????????????//2.取出空的事件隊列
????????????LongEvent?longEvent?=?ringBuffer.get(sequence);
????????????data?=?byteBuffer.getLong(0);
????????????//3.獲取事件隊列傳遞的數(shù)據(jù)
????????????longEvent.setValue(data);
????????????try?{
????????????????Thread.sleep(10);
????????????}?catch?(InterruptedException?e)?{
????????????????//?TODO?Auto-generated?catch?block
????????????????e.printStackTrace();
????????????}
????????}?finally?{
????????????System.out.println("生產(chǎn)這準(zhǔn)備發(fā)送數(shù)據(jù)");
????????????//4.發(fā)布事件
????????????ringBuffer.publish(sequence);
????????}
????}
}
public?class?DisruptorMain?{
????public?static?void?main(String[]?args)?{
????????//?1.創(chuàng)建一個可緩存的線程?提供線程來出發(fā)Consumer?的事件處理
????????ExecutorService?executor?=?Executors.newCachedThreadPool();
????????//?2.創(chuàng)建工廠
????????EventFactory?eventFactory?=?new?LongEventFactory();
????????//?3.創(chuàng)建ringBuffer?大小
????????int?ringBufferSize?=?1024?*?1024;?//?ringBufferSize大小一定要是2的N次方
????????//?4.創(chuàng)建Disruptor
????????Disruptor?disruptor?=?new?Disruptor(eventFactory,?ringBufferSize,?executor,
????????????????ProducerType.SINGLE,?new?YieldingWaitStrategy());
????????//?5.連接消費端方法
????????disruptor.handleEventsWith(new?LongEventHandler());
????????//?6.啟動
????????disruptor.start();
????????//?7.創(chuàng)建RingBuffer容器
????????RingBuffer?ringBuffer?=?disruptor.getRingBuffer();
????????//?8.創(chuàng)建生產(chǎn)者
????????LongEventProducer?producer?=?new?LongEventProducer(ringBuffer);
????????//?9.指定緩沖區(qū)大小
????????ByteBuffer?byteBuffer?=?ByteBuffer.allocate(8);
????????for?(int?i?=?1;?i?<=?100;?i++)?{
????????????byteBuffer.putLong(0,?i);
????????????producer.onData(byteBuffer);
????????}
????????//10.關(guān)閉disruptor和executor
????????disruptor.shutdown();
????????executor.shutdown();
????}
}
核心設(shè)計原理
Disruptor通過以下設(shè)計來解決隊列速度慢的問題:
「環(huán)形數(shù)組結(jié)構(gòu):」
為了避免垃圾回收,采用數(shù)組而非鏈表。同時,數(shù)組對處理器的緩存機(jī)制更加友好
?原因:CPU緩存是由很多個緩存行組成的。每個緩存行通常是64字節(jié),并且它有效地引用主內(nèi)存中的一塊兒地址。一個Java的long類型變量是8字節(jié),因此在一個緩存行中可以存8個long類型的變量。CPU每次從主存中拉取數(shù)據(jù)時,會把相鄰的數(shù)據(jù)也存入同一個緩存行。在訪問一個long數(shù)組的時候,如果數(shù)組中的一個值被加載到緩存中,它會自動加載另外7個。因此你能非??斓谋闅v這個數(shù)組。
?
「元素位置定位:」
數(shù)組長度2^n,通過位運算,加快定位的速度。下標(biāo)采取遞增的形式。不用擔(dān)心index溢出的問題。index是long類型,即使100萬QPS的處理速度,也需要30萬年才能用完。
「無鎖設(shè)計:」
每個生產(chǎn)者或者消費者線程,會先申請可以操作的元素在數(shù)組中的位置,申請到之后,直接在該位置寫入或者讀取數(shù)據(jù),整個過程通過原子變量CAS,保證操作的線程安全
數(shù)據(jù)結(jié)構(gòu)
框架使用RingBuffer來作為隊列的數(shù)據(jù)結(jié)構(gòu),RingBuffer就是一個可自定義大小的環(huán)形數(shù)組。
除數(shù)組外還有一個序列號(sequence),用以指向下一個可用的元素,供生產(chǎn)者與消費者使用。
原理圖如下所示:
Sequence
mark:Disruptor通過順序遞增的序號來編號管理通過其進(jìn)行交換的數(shù)據(jù)(事件),對數(shù)據(jù)(事件)的處理過程總是沿著序號逐個遞增處理。
「數(shù)組+序列號設(shè)計的優(yōu)勢是什么呢?」
回顧一下HashMap,在知道索引(index)下標(biāo)的情況下,存與取數(shù)組上的元素時間復(fù)雜度只有O(1),而這個index我們可以通過序列號與數(shù)組的長度取模來計算得出,index=sequence % table.length。當(dāng)然也可以用位運算來計算效率更高,此時table.length必須是2的冪次方。
寫數(shù)據(jù)流程
單線程寫數(shù)據(jù)的流程:
申請寫入m個元素; 若是有m個元素可以入,則返回最大的序列號。這兒主要判斷是否會覆蓋未讀的元素; 若是返回的正確,則生產(chǎn)者開始寫入元素。 
使用場景
經(jīng)過測試,Disruptor的的延時和吞吐量都比ArrayBlockingQueue優(yōu)秀很多,所以,當(dāng)你在使用ArrayBlockingQueue出現(xiàn)性能瓶頸的時候,你就可以考慮采用Disruptor的代替。
參考:https://github.com/LMAX-Exchange/disruptor/wiki/Performance-Results

當(dāng)然,Disruptor性能高并不是必然的,所以,是否使用還得經(jīng)過測試。
Disruptor的最常用的場景就是“生產(chǎn)者-消費者”場景,對場景的就是“一個生產(chǎn)者、多個消費者”的場景,并且要求順序處理。
舉個例子,我們從MySQL的BigLog文件中順序讀取數(shù)據(jù),然后寫入到ElasticSearch(搜索引擎)中。在這種場景下,BigLog要求一個文件一個生產(chǎn)者,那個是一個生產(chǎn)者。而寫入到ElasticSearch,則嚴(yán)格要求順序,否則會出現(xiàn)問題,所以通常意義上的多消費者線程無法解決該問題,如果通過加鎖,則性能大打折扣
參考:
https://tech.meituan.com/2016/11/18/disruptor.html
https://github.com/LMAX-Exchange/disruptor/wiki





關(guān)注Java技術(shù)??锤喔韶?/strong>


