1. 多線程設(shè)計模式:保護(hù)性暫停模式詳解以及其在DUBBO應(yīng)用源碼分析

        共 14379字,需瀏覽 29分鐘

         ·

        2020-11-06 02:54


        JAVA前線?


        互聯(lián)網(wǎng)技術(shù)人思考與分享,歡迎長按關(guān)注


        1 文章概述

        在多線程編程實(shí)踐中,我們肯定會面臨線程間數(shù)據(jù)交互的問題。在處理這類問題時需要使用一些設(shè)計模式,從而保證程序的正確性和健壯性。

        保護(hù)性暫停設(shè)計模式就是解決多線程間數(shù)據(jù)交互問題的一種模式。本文先從基礎(chǔ)案例介紹保護(hù)性暫?;靖拍詈蛯?shí)踐,再由淺入深,最終分析DUBBO源碼中保護(hù)性暫停設(shè)計模式使用場景。


        2 什么是保護(hù)性暫停

        我們設(shè)想這樣一種場景:線程A生產(chǎn)數(shù)據(jù),線程B讀取數(shù)據(jù)這個數(shù)據(jù)。

        但是有一種情況:線程B準(zhǔn)備讀取數(shù)據(jù)時,此時線程A還沒有生產(chǎn)出數(shù)據(jù)。

        在這種情況下線程B不能一直空轉(zhuǎn),也不能立即退出,線程B要等到生產(chǎn)數(shù)據(jù)完成并拿到數(shù)據(jù)之后才退出。

        那么在數(shù)據(jù)沒有生產(chǎn)出這段時間,線程B需要執(zhí)行一種等待機(jī)制,這樣可以達(dá)到對系統(tǒng)保護(hù)目的,這就是保護(hù)性暫停。

        保護(hù)性暫停有多種實(shí)現(xiàn)方式,本文我們用synchronized/wait/notify的方式實(shí)現(xiàn)。

        @Getter
        @Setter
        public?class?MyData?implements?Serializable?{
        ????private?static?final?long?serialVersionUID?=?1L;
        ????private?String?message;

        ????public?MyData(String?message)?{
        ????????this.message?=?message;
        ????}
        }

        class?Resource1?{
        ????private?MyData?data;
        ????private?Object?lock?=?new?Object();

        ????public?MyData?getData()?{
        ????????synchronized?(lock)?{
        ????????????while?(data?==?null)?{
        ????????????????try?{
        ????????????????????//?沒有數(shù)據(jù)則釋放鎖并暫停等待被喚醒
        ????????????????????lock.wait();
        ????????????????}?catch?(InterruptedException?e)?{
        ????????????????????e.printStackTrace();
        ????????????????}
        ????????????}
        ????????????return?data;
        ????????}
        ????}

        ????public?void?sendData(MyData?data)?{
        ????????synchronized?(lock)?{
        ????????????//?生產(chǎn)數(shù)據(jù)后喚醒消費(fèi)線程
        ????????????this.data?=?data;
        ????????????lock.notifyAll();
        ????????}
        ????}
        }

        /**
        ?*?保護(hù)性暫停實(shí)例一
        ?*
        ?*?@author?微信公眾號「JAVA前線」
        ?*/

        public?class?ProtectDesignTest1?{

        ????public?static?void?main(String[]?args)?{
        ????????Resource1?resource?=?new?Resource1();
        ????????new?Thread(()?->?{
        ????????????try?{
        ????????????????MyData?data?=?new?MyData("hello");
        ????????????????System.out.println(Thread.currentThread().getName()?+?"生產(chǎn)數(shù)據(jù)="?+?data);
        ????????????????//?模擬發(fā)送耗時
        ????????????????TimeUnit.SECONDS.sleep(3);
        ????????????????resource.sendData(data);
        ????????????}?catch?(InterruptedException?e)?{
        ????????????????e.printStackTrace();
        ????????????}
        ????????},?"t1").start();

        ????????new?Thread(()?->?{
        ????????????MyData?data?=?resource.getData();
        ????????????System.out.println(Thread.currentThread().getName()?+?"接收到數(shù)據(jù)="?+?data);
        ????????},?"t2").start();
        ????}
        }

        在上述實(shí)例中線程1生產(chǎn)數(shù)據(jù),線程2消費(fèi)數(shù)據(jù)。Resource1類中通過wait/notify實(shí)現(xiàn)了保護(hù)性暫停設(shè)計模式。


        3 加一個超時時間

        上述實(shí)例中如果線程2沒有獲取到數(shù)據(jù),那么線程2直到拿到數(shù)據(jù)才會退出。現(xiàn)在我們給獲取數(shù)據(jù)指定一個超時時間,如果在這個時間內(nèi)沒有獲取到數(shù)據(jù)則拋出超時異常。雖然只是加一個參數(shù),但是其中有很多細(xì)節(jié)需要注意。

        3.1 一段有問題的代碼

        我們分析下面這段代碼

        class?Resource2?{
        ????private?MyData?data;
        ????private?Object?lock?=?new?Object();

        ????public?MyData?getData(int?timeOut)?{
        ????????synchronized?(lock)?{
        ????????????while?(data?==?null)?{
        ????????????????try?{
        ????????????????????//?代碼1
        ????????????????????lock.wait(timeOut);
        ????????????????????break;
        ????????????????}?catch?(InterruptedException?e)?{
        ????????????????????e.printStackTrace();
        ????????????????}
        ????????????}
        ????????????if?(data?==?null)?{
        ????????????????throw?new?RuntimeException("超時未獲取到結(jié)果");
        ????????????}
        ????????????return?data;
        ????????}
        ????}

        ????public?void?sendData(MyData?data)?{
        ????????synchronized?(lock)?{
        ????????????this.data?=?data;
        ????????????lock.notifyAll();
        ????????}
        ????}
        }


        /**
        ?*?保護(hù)性暫停實(shí)例二
        ?*
        ?*?@author?微信公眾號「JAVA前線」
        ?*/

        public?class?ProtectDesignTest2?{

        ????public?static?void?main(String[]?args)?{
        ????????Resource2?resource?=?new?Resource2();
        ????????new?Thread(()?->?{
        ????????????try?{
        ????????????????MyData?data?=?new?MyData("hello");
        ????????????????System.out.println(Thread.currentThread().getName()?+?"生產(chǎn)數(shù)據(jù)="?+?data);
        ????????????????//?模擬發(fā)送耗時
        ????????????????TimeUnit.SECONDS.sleep(3);
        ????????????????resource.sendData(data);
        ????????????}?catch?(InterruptedException?e)?{
        ????????????????e.printStackTrace();
        ????????????}
        ????????},?"t1").start();

        ????????new?Thread(()?->?{
        ????????????MyData?data?=?resource.getData(1000);
        ????????????System.out.println(Thread.currentThread().getName()?+?"接收到數(shù)據(jù)="?+?data);
        ????????},?"t2").start();
        ????}
        }

        這段代碼看似沒有問題,使用的也是wait帶有超時時間的參數(shù),那么問題可能出在哪里呢?

        問題是線程虛假喚醒帶來的。如果還沒有到超時時間代碼1就被虛假喚醒,此時data還沒有值就會直接跳出循環(huán),這樣沒有達(dá)到我們預(yù)期的超時時間才跳出循環(huán)的預(yù)期。

        關(guān)于虛假喚醒這個概念,我們看看JDK官方文檔相關(guān)介紹。

        A thread can also wake up without being notified, interrupted, or timing out, a so-called spurious wakeup. While this will rarely occur in practice, applications must guard against it by testing for the condition that should have caused the thread to be awakened, and continuing to wait if the condition is not satisfied. In other words, waits should always occur in loops, like this one:

        synchronized?(obj)?{
        ????while?()
        ????????obj.wait(timeout);
        }

        官方文檔告訴我們一個線程可能會在沒有被notify時虛假喚醒,所以判斷是否繼續(xù)wait時必須用while循環(huán)。我們在寫代碼時一定也要注意線程虛假喚醒問題。

        3.2 正確實(shí)例

        上面我們明白了虛假喚醒問題,現(xiàn)在我們對代碼進(jìn)行修改,說明參看代碼注釋。

        class?Resource3?{
        ????private?MyData?data;
        ????private?Object?lock?=?new?Object();

        ????public?MyData?getData(int?timeOut)?{
        ????????synchronized?(lock)?{
        ????????????//?運(yùn)行時長
        ????????????long?timePassed?=?0;
        ????????????//?開始時間
        ????????????long?begin?=?System.currentTimeMillis();
        ????????????//?如果結(jié)果為空
        ????????????while?(data?==?null)?{
        ????????????????try?{
        ????????????????????//?如果運(yùn)行時長大于超時時間退出循環(huán)
        ????????????????????if?(timePassed?>?timeOut)?{
        ????????????????????????break;
        ????????????????????}
        ????????????????????//?如果運(yùn)行時長小于超時時間表示虛假喚醒?->?只需再等待時間差值
        ????????????????????long?waitTime?=?timeOut?-?timePassed;

        ????????????????????//?等待時間差值
        ????????????????????lock.wait(waitTime);

        ????????????????????//?結(jié)果不為空直接返回
        ????????????????????if?(data?!=?null)?{
        ????????????????????????break;
        ????????????????????}
        ????????????????????//?被喚醒后計算運(yùn)行時長
        ????????????????????timePassed?=?System.currentTimeMillis()?-?begin;
        ????????????????}?catch?(InterruptedException?e)?{
        ????????????????????e.printStackTrace();
        ????????????????}
        ????????????}
        ????????????if?(data?==?null)?{
        ????????????????throw?new?RuntimeException("超時未獲取到結(jié)果");
        ????????????}
        ????????????return?data;
        ????????}
        ????}

        ????public?void?sendData(MyData?data)?{
        ????????synchronized?(lock)?{
        ????????????this.data?=?data;
        ????????????lock.notifyAll();
        ????????}
        ????}
        }

        /**
        ?*?保護(hù)性暫停實(shí)例三
        ?*
        ?*?@author?微信公眾號「JAVA前線」
        ?*/

        public?class?ProtectDesignTest3?{

        ????public?static?void?main(String[]?args)?{
        ????????Resource3?resource?=?new?Resource3();
        ????????new?Thread(()?->?{
        ????????????try?{
        ????????????????MyData?data?=?new?MyData("hello");
        ????????????????System.out.println(Thread.currentThread().getName()?+?"生產(chǎn)數(shù)據(jù)="?+?data);
        ????????????????//?模擬發(fā)送耗時
        ????????????????TimeUnit.SECONDS.sleep(3);
        ????????????????resource.sendData(data);
        ????????????}?catch?(InterruptedException?e)?{
        ????????????????e.printStackTrace();
        ????????????}
        ????????},?"t1").start();

        ????????new?Thread(()?->?{
        ????????????MyData?data?=?resource.getData(1000);
        ????????????System.out.println(Thread.currentThread().getName()?+?"接收到數(shù)據(jù)="?+?data);
        ????????},?"t2").start();
        ????}
        }

        4 加一個編號

        現(xiàn)在再來設(shè)想一個場景:現(xiàn)在有三個生產(chǎn)數(shù)據(jù)的線程1、2、3,三個獲取數(shù)據(jù)的線程4、5、6,我們希望每個獲取數(shù)據(jù)線程都只拿到其中一個生產(chǎn)線程的數(shù)據(jù),不能多拿也不能少拿。

        這里引入一個Futures模型,這個模型為每個資源進(jìn)行編號并存儲在容器中,例如線程1生產(chǎn)的數(shù)據(jù)被拿走則從容器中刪除,一直到容器為空結(jié)束。

        @Getter
        @Setter
        public?class?MyNewData?implements?Serializable?{
        ????private?static?final?long?serialVersionUID?=?1L;
        ????private?static?final?AtomicLong?ID?=?new?AtomicLong(0);
        ????private?Long?id;
        ????private?String?message;

        ????public?MyNewData(String?message)?{
        ????????this.id?=?newId();
        ????????this.message?=?message;
        ????}

        ????/**
        ?????*?自增到最大值會回到最小值(負(fù)值可以作為識別ID)
        ?????*/

        ????private?static?long?newId()?{
        ????????return?ID.getAndIncrement();
        ????}

        ????public?Long?getId()?{
        ????????return?this.id;
        ????}
        }

        class?MyResource?{
        ????private?MyNewData?data;
        ????private?Object?lock?=?new?Object();

        ????public?MyNewData?getData(int?timeOut)?{
        ????????synchronized?(lock)?{
        ????????????long?timePassed?=?0;
        ????????????long?begin?=?System.currentTimeMillis();
        ????????????while?(data?==?null)?{
        ????????????????try?{
        ????????????????????if?(timePassed?>?timeOut)?{
        ????????????????????????break;
        ????????????????????}
        ????????????????????long?waitTime?=?timeOut?-?timePassed;
        ????????????????????lock.wait(waitTime);
        ????????????????????if?(data?!=?null)?{
        ????????????????????????break;
        ????????????????????}
        ????????????????????timePassed?=?System.currentTimeMillis()?-?begin;
        ????????????????}?catch?(InterruptedException?e)?{
        ????????????????????e.printStackTrace();
        ????????????????}
        ????????????}
        ????????????if?(data?==?null)?{
        ????????????????throw?new?RuntimeException("超時未獲取到結(jié)果");
        ????????????}
        ????????????return?data;
        ????????}
        ????}

        ????public?void?sendData(MyNewData?data)?{
        ????????synchronized?(lock)?{
        ????????????this.data?=?data;
        ????????????lock.notifyAll();
        ????????}
        ????}
        }

        class?MyFutures?{
        ????private?static?final?Map?FUTURES?=?new?ConcurrentHashMap<>();

        ????public?static?MyResource?newResource(MyNewData?data)?{
        ????????final?MyResource?future?=?new?MyResource();
        ????????FUTURES.put(data.getId(),?future);
        ????????return?future;
        ????}

        ????public?static?MyResource?getResource(Long?id)?{
        ????????return?FUTURES.remove(id);
        ????}

        ????public?static?Set?getIds()?{
        ????????return?FUTURES.keySet();
        ????}
        }


        /**
        ?*?保護(hù)性暫停實(shí)例四
        ?*
        ?*?@author?微信公眾號「JAVA前線」
        ?*/

        public?class?ProtectDesignTest4?{

        ????public?static?void?main(String[]?args)?throws?Exception?{
        ????????for?(int?i?=?0;?i?3;?i++)?{
        ????????????final?int?index?=?i;
        ????????????new?Thread(()?->?{
        ????????????????try?{
        ????????????????????MyNewData?data?=?new?MyNewData("hello_"?+?index);
        ????????????????????MyResource?resource?=?MyFutures.newResource(data);
        ????????????????????//?模擬發(fā)送耗時
        ????????????????????TimeUnit.SECONDS.sleep(1);
        ????????????????????resource.sendData(data);
        ????????????????????System.out.println("生產(chǎn)數(shù)據(jù)data="?+?data);
        ????????????????}?catch?(InterruptedException?e)?{
        ????????????????????e.printStackTrace();
        ????????????????}

        ????????????}).start();
        ????????}
        ????????TimeUnit.SECONDS.sleep(1);

        ????????for?(Long?i?:?MyFutures.getIds())?{
        ????????????final?long?index?=?i;
        ????????????new?Thread(()?->?{
        ????????????????MyResource?resource?=?MyFutures.getResource(index);
        ????????????????int?timeOut?=?3000;
        ????????????????System.out.println("接收數(shù)據(jù)data="?+?resource.getData(timeOut));
        ????????????}).start();
        ????????}
        ????}
        }

        5 DUBBO應(yīng)用實(shí)例

        我們順著這一個鏈路跟蹤代碼:消費(fèi)者發(fā)送請求 > 提供者接收請求并執(zhí)行,并且將運(yùn)行結(jié)果發(fā)送給消費(fèi)者 >消費(fèi)者接收結(jié)果。

        (1) 消費(fèi)者發(fā)送請求

        消費(fèi)者發(fā)送的數(shù)據(jù)包含請求ID,并且將關(guān)系維護(hù)進(jìn)FUTURES容器

        final?class?HeaderExchangeChannel?implements?ExchangeChannel?{

        ????@Override
        ????public?ResponseFuture?request(Object?request,?int?timeout)?throws?RemotingException?{
        ????????if?(closed)?{
        ????????????throw?new?RemotingException(this.getLocalAddress(),?null,?"Failed?to?send?request?"?+?request?+?",?cause:?The?channel?"?+?this?+?"?is?closed!");
        ????????}
        ????????Request?req?=?new?Request();
        ????????req.setVersion(Version.getProtocolVersion());
        ????????req.setTwoWay(true);
        ????????req.setData(request);
        ????????//?代碼1
        ????????DefaultFuture?future?=?DefaultFuture.newFuture(channel,?req,?timeout);
        ????????try?{
        ????????????channel.send(req);
        ????????}?catch?(RemotingException?e)?{
        ????????????future.cancel();
        ????????????throw?e;
        ????????}
        ????????return?future;
        ????}
        }

        class?DefaultFuture?implements?ResponseFuture?{

        ????//?FUTURES容器
        ????private?static?final?Map?FUTURES?=?new?ConcurrentHashMap<>();

        ????private?DefaultFuture(Channel?channel,?Request?request,?int?timeout)?{
        ????????this.channel?=?channel;
        ????????this.request?=?request;
        ????????//?請求ID
        ????????this.id?=?request.getId();
        ????????this.timeout?=?timeout?>?0???timeout?:?channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY,?Constants.DEFAULT_TIMEOUT);
        ????????FUTURES.put(id,?this);
        ????????CHANNELS.put(id,?channel);
        ????}
        }

        (2) 提供者接收請求并執(zhí)行,并且將運(yùn)行結(jié)果發(fā)送給消費(fèi)者


        public?class?HeaderExchangeHandler?implements?ChannelHandlerDelegate?{

        ????void?handleRequest(final?ExchangeChannel?channel,?Request?req)?throws?RemotingException?{
        ????????//?response與請求ID對應(yīng)
        ????????Response?res?=?new?Response(req.getId(),?req.getVersion());
        ????????if?(req.isBroken())?{
        ????????????Object?data?=?req.getData();
        ????????????String?msg;
        ????????????if?(data?==?null)?{
        ????????????????msg?=?null;
        ????????????}?else?if?(data?instanceof?Throwable)?{
        ????????????????msg?=?StringUtils.toString((Throwable)?data);
        ????????????}?else?{
        ????????????????msg?=?data.toString();
        ????????????}
        ????????????res.setErrorMessage("Fail?to?decode?request?due?to:?"?+?msg);
        ????????????res.setStatus(Response.BAD_REQUEST);
        ????????????channel.send(res);
        ????????????return;
        ????????}
        ????????//?message?=?RpcInvocation包含方法名、參數(shù)名、參數(shù)值等
        ????????Object?msg?=?req.getData();
        ????????try?{

        ????????????//?DubboProtocol.reply執(zhí)行實(shí)際業(yè)務(wù)方法
        ????????????CompletableFuture?future?=?handler.reply(channel,?msg);

        ????????????//?如果請求已經(jīng)完成則發(fā)送結(jié)果
        ????????????if?(future.isDone())?{
        ????????????????res.setStatus(Response.OK);
        ????????????????res.setResult(future.get());
        ????????????????channel.send(res);
        ????????????????return;
        ????????????}
        ????????}?catch?(Throwable?e)?{
        ????????????res.setStatus(Response.SERVICE_ERROR);
        ????????????res.setErrorMessage(StringUtils.toString(e));
        ????????????channel.send(res);
        ????????}
        ????}
        }

        (3) 消費(fèi)者接收結(jié)果

        以下DUBBO源碼很好體現(xiàn)了保護(hù)性暫停這個設(shè)計模式,說明參看注釋

        class?DefaultFuture?implements?ResponseFuture?{
        ????private?final?Lock?lock?=?new?ReentrantLock();
        ????private?final?Condition?done?=?lock.newCondition();

        ????public?static?void?received(Channel?channel,?Response?response)?{
        ????????try?{
        ????????????//?取出對應(yīng)的請求對象
        ????????????DefaultFuture?future?=?FUTURES.remove(response.getId());
        ????????????if?(future?!=?null)?{
        ????????????????future.doReceived(response);
        ????????????}?else?{
        ????????????????logger.warn("The?timeout?response?finally?returned?at?"
        ????????????????????????????+?(new?SimpleDateFormat("yyyy-MM-dd?HH:mm:ss.SSS").format(new?Date()))
        ????????????????????????????+?",?response?"?+?response
        ????????????????????????????+?(channel?==?null???""?:?",?channel:?"?+?channel.getLocalAddress()
        ???????????????????????????????+?"?->?"?+?channel.getRemoteAddress()));
        ????????????}
        ????????}?finally?{
        ????????????CHANNELS.remove(response.getId());
        ????????}
        ????}


        ????@Override
        ????public?Object?get(int?timeout)?throws?RemotingException?{
        ????????if?(timeout?<=?0)?{
        ????????????timeout?=?Constants.DEFAULT_TIMEOUT;
        ????????}
        ????????if?(!isDone())?{
        ????????????long?start?=?System.currentTimeMillis();
        ????????????lock.lock();
        ????????????try?{
        ????????????????while?(!isDone())?{

        ????????????????????//?放棄鎖并使當(dāng)前線程阻塞,直到發(fā)出信號中斷它或者達(dá)到超時時間
        ????????????????????done.await(timeout,?TimeUnit.MILLISECONDS);

        ????????????????????//?阻塞結(jié)束后再判斷是否完成
        ????????????????????if?(isDone())?{
        ????????????????????????break;
        ????????????????????}

        ????????????????????//?阻塞結(jié)束后判斷是否超時
        ????????????????????if(System.currentTimeMillis()?-?start?>?timeout)?{
        ????????????????????????break;
        ????????????????????}
        ????????????????}
        ????????????}?catch?(InterruptedException?e)?{
        ????????????????throw?new?RuntimeException(e);
        ????????????}?finally?{
        ????????????????lock.unlock();
        ????????????}
        ????????????//?response對象仍然為空則拋出超時異常
        ????????????if?(!isDone())?{
        ????????????????throw?new?TimeoutException(sent?>?0,?channel,?getTimeoutMessage(false));
        ????????????}
        ????????}
        ????????return?returnFromResponse();
        ????}

        ????private?void?doReceived(Response?res)?{
        ????????lock.lock();
        ????????try?{
        ????????????//?接收到服務(wù)器響應(yīng)賦值response
        ????????????response?=?res;
        ????????????if?(done?!=?null)?{
        ????????????????//?喚醒get方法中處于等待的代碼塊
        ????????????????done.signal();
        ????????????}
        ????????}?finally?{
        ????????????lock.unlock();
        ????????}
        ????????if?(callback?!=?null)?{
        ????????????invokeCallback(callback);
        ????????}
        ????}
        }

        6 文章總結(jié)

        本文我們從基礎(chǔ)案例介紹保護(hù)性暫?;靖拍詈蛯?shí)踐,最終分析DUBBO源碼中保護(hù)性暫停設(shè)計模式使用場景。我們在設(shè)計并發(fā)框架時要注意虛假喚醒問題,以及請求和響應(yīng)關(guān)系對應(yīng)問題,希望本文對大家有所幫助。



        JAVA前線?


        互聯(lián)網(wǎng)技術(shù)人思考與分享,歡迎長按關(guān)注


        瀏覽 24
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報
            
            

                      • 久久免费少妇视频 | 波多野42部无码喷潮在线 | 91久久久久久久久18 | 中文字幕日韩有码 | 大尺度人体私拍裸体偷拍 |