長文詳解:DUBBO源碼使用了哪些設(shè)計模式
JAVA前線?
歡迎大家關(guān)注公眾號「JAVA前線」查看更多精彩分享,主要包括源碼分析、實際應(yīng)用、架構(gòu)思維、職場分享、產(chǎn)品思考等等,同時也非常歡迎大家加我微信「java_front」一起交流學(xué)習(xí)
0 文章概述
DUBBO作為RPC領(lǐng)域優(yōu)秀開源的框架在業(yè)界十分流行,本文我們閱讀其源碼并對其使用到的設(shè)計模式進行分析。需要說明的是本文所說的設(shè)計模式更加廣義,不僅包括標(biāo)準(zhǔn)意義上23種設(shè)計模式,還有一些常見經(jīng)過檢驗的代碼模式例如雙重檢查鎖模式、多線程保護性暫停模式等等。

1 模板方法
模板方法模式定義一個操作中的算法骨架,一般使用抽象類定義算法骨架。抽象類同時定義一些抽象方法,這些抽象方法延遲到子類實現(xiàn),這樣子類不僅遵守了算法骨架約定,也實現(xiàn)了自己的算法。既保證了規(guī)約也兼顧靈活性。這就是用抽象構(gòu)建框架,用實現(xiàn)擴展細(xì)節(jié)。
DUBBO源碼中有一個非常重要的核心概念I(lǐng)nvoker,我們可以理解為執(zhí)行器或者說一個可執(zhí)行對象,能夠根據(jù)方法的名稱、參數(shù)得到相應(yīng)執(zhí)行結(jié)果,這個特性體現(xiàn)了代理模式我們后面章節(jié)再說,本章節(jié)我們先分析其中的模板方法模式。
public?abstract?class?AbstractInvoker<T>?implements?Invoker<T>?{
????@Override
????public?Result?invoke(Invocation?inv)?throws?RpcException?{
????????RpcInvocation?invocation?=?(RpcInvocation)?inv;
????????invocation.setInvoker(this);
????????if?(attachment?!=?null?&&?attachment.size()?>?0)?{
????????????invocation.addAttachmentsIfAbsent(attachment);
????????}
????????Map?contextAttachments?=?RpcContext.getContext().getAttachments();
????????if?(contextAttachments?!=?null?&&?contextAttachments.size()?!=?0)?{
????????????invocation.addAttachments(contextAttachments);
????????}
????????if?(getUrl().getMethodParameter(invocation.getMethodName(),?Constants.ASYNC_KEY,?false))?{
????????????invocation.setAttachment(Constants.ASYNC_KEY,?Boolean.TRUE.toString());
????????}
????????RpcUtils.attachInvocationIdIfAsync(getUrl(),?invocation);
????????try?{
????????????return?doInvoke(invocation);
????????}?catch?(InvocationTargetException?e)?{
????????????Throwable?te?=?e.getTargetException();
????????????if?(te?==?null)?{
????????????????return?new?RpcResult(e);
????????????}?else?{
????????????????if?(te?instanceof?RpcException)?{
????????????????????((RpcException)?te).setCode(RpcException.BIZ_EXCEPTION);
????????????????}
????????????????return?new?RpcResult(te);
????????????}
????????}?catch?(RpcException?e)?{
????????????if?(e.isBiz())?{
????????????????return?new?RpcResult(e);
????????????}?else?{
????????????????throw?e;
????????????}
????????}?catch?(Throwable?e)?{
????????????return?new?RpcResult(e);
????????}
????}
????protected?abstract?Result?doInvoke(Invocation?invocation)?throws?Throwable;
}
AbstractInvoker作為抽象父類定義了invoke方法這個方法骨架,并且定義了doInvoke抽象方法供子類擴展,例如子類InjvmInvoker、DubboInvoker各自實現(xiàn)了doInvoke方法。
InjvmInvoker是本地引用,調(diào)用時直接從本地暴露生產(chǎn)者容器獲取生產(chǎn)者Exporter對象即可。
class?InjvmInvoker<T>?extends?AbstractInvoker<T>?{
????@Override
????public?Result?doInvoke(Invocation?invocation)?throws?Throwable?{
????????Exporter>?exporter?=?InjvmProtocol.getExporter(exporterMap,?getUrl());
????????if?(exporter?==?null)?{
????????????throw?new?RpcException("Service?["?+?key?+?"]?not?found.");
????????}
????????RpcContext.getContext().setRemoteAddress(Constants.LOCALHOST_VALUE,?0);
????????return?exporter.getInvoker().invoke(invocation);
????}
}
DubboInvoker相對復(fù)雜一些,需要考慮同步異步調(diào)用方式,配置優(yōu)先級,遠(yuǎn)程通信等等。
public?class?DubboInvoker<T>?extends?AbstractInvoker<T>?{
????@Override
????protected?Result?doInvoke(final?Invocation?invocation)?throws?Throwable?{
????????RpcInvocation?inv?=?(RpcInvocation)?invocation;
????????final?String?methodName?=?RpcUtils.getMethodName(invocation);
????????inv.setAttachment(Constants.PATH_KEY,?getUrl().getPath());
????????inv.setAttachment(Constants.VERSION_KEY,?version);
????????ExchangeClient?currentClient;
????????if?(clients.length?==?1)?{
????????????currentClient?=?clients[0];
????????}?else?{
????????????currentClient?=?clients[index.getAndIncrement()?%?clients.length];
????????}
????????try?{
????????????boolean?isAsync?=?RpcUtils.isAsync(getUrl(),?invocation);
????????????boolean?isAsyncFuture?=?RpcUtils.isReturnTypeFuture(inv);
????????????boolean?isOneway?=?RpcUtils.isOneway(getUrl(),?invocation);
????????????//?超時時間方法級別配置優(yōu)先級最高
????????????int?timeout?=?getUrl().getMethodParameter(methodName,?Constants.TIMEOUT_KEY,?Constants.DEFAULT_TIMEOUT);
????????????if?(isOneway)?{
????????????????boolean?isSent?=?getUrl().getMethodParameter(methodName,?Constants.SENT_KEY,?false);
????????????????currentClient.send(inv,?isSent);
????????????????RpcContext.getContext().setFuture(null);
????????????????return?new?RpcResult();
????????????}?else?if?(isAsync)?{
????????????????ResponseFuture?future?=?currentClient.request(inv,?timeout);
????????????????FutureAdapter2 動態(tài)代理
代理模式核心是為一個目標(biāo)對象提供一個代理,以控制對這個對象的訪問,我們可以通過代理對象訪問目標(biāo)對象,這樣可以增強目標(biāo)對象功能。
代理模式分為靜態(tài)代理與動態(tài)代理,動態(tài)代理又分為JDK代理和Cglib代理,JDK代理只能代理實現(xiàn)類接口的目標(biāo)對象,但是Cglib沒有這種要求。
2.1 JDK動態(tài)代理
動態(tài)代理本質(zhì)是通過生成字節(jié)碼的方式將代理對象織入目標(biāo)對象,本文以JDK動態(tài)代理為例。
第一步定義業(yè)務(wù)方法,即被代理的目標(biāo)對象:
public?interface?StudentJDKService?{
????public?void?addStudent(String?name);
????public?void?updateStudent(String?name);
}
public?class?StudentJDKServiceImpl?implements?StudentJDKService?{
????@Override
????public?void?addStudent(String?name)?{
????????System.out.println("add?student="?+?name);
????}
????@Override
????public?void?updateStudent(String?name)?{
????????System.out.println("update?student="?+?name);
????}
}
第二步定義一個事務(wù)代理對象:
public?class?TransactionInvocationHandler?implements?InvocationHandler?{
????private?Object?target;
????public?TransactionInvocationHandler(Object?target)?{
????????this.target?=?target;
????}
????@Override
????public?Object?invoke(Object?proxy,?Method?method,?Object[]?args)?throws?Throwable?{
????????System.out.println("------前置通知------");
????????Object?rs?=?method.invoke(target,?args);
????????System.out.println("------后置通知------");
????????return?rs;
????}
}
第三步定義代理工廠:
public?class?ProxyFactory?{
????public?Object?getProxy(Object?target,?InvocationHandler?handler)?{
????????ClassLoader?loader?=?this.getClass().getClassLoader();
????????Class>[]?interfaces?=?target.getClass().getInterfaces();
????????Object?proxy?=?Proxy.newProxyInstance(loader,?interfaces,?handler);
????????return?proxy;
????}
}
第四步進行測試:
public?class?ZTest?{
????public?static?void?main(String[]?args)?throws?Exception?{
????????testSimple();
????}
????public?static?void?testSimple()?{
????????StudentJDKService?target?=?new?StudentJDKServiceImpl();
????????TransactionInvocationHandler?handler?=?new?TransactionInvocationHandler(target);
????????ProxyFactory?proxyFactory?=?new?ProxyFactory();
????????Object?proxy?=?proxyFactory.getProxy(target,?handler);
????????StudentJDKService?studentService?=?(StudentJDKService)?proxy;
????????studentService.addStudent("JAVA前線");
????}
}
ProxyGenerator.generateProxyClass是生成字節(jié)碼文件核心方法,我們看一看動態(tài)字節(jié)碼到底如何定義:
public?class?ZTest?{
????public?static?void?main(String[]?args)?throws?Exception?{
????????createProxyClassFile();
????}
????public?static?void?createProxyClassFile()?{
????????String?name?=?"Student$Proxy";
????????byte[]?data?=?ProxyGenerator.generateProxyClass(name,?new?Class[]?{?StudentJDKService.class?});
????????FileOutputStream?out?=?null;
????????try?{
????????????String?fileName?=?"c:/test/"?+?name?+?".class";
????????????File?file?=?new?File(fileName);
????????????out?=?new?FileOutputStream(file);
????????????out.write(data);
????????}?catch?(Exception?e)?{
????????????System.out.println(e.getMessage());
????????}?finally?{
????????????if?(null?!=?out)?{
????????????????try?{
????????????????????out.close();
????????????????}?catch?(IOException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????}
????????}
????}
}
最終生成字節(jié)碼文件如下,我們看到代理對象被織入了目標(biāo)對象:
import?com.xpz.dubbo.simple.jdk.StudentJDKService;
import?java.lang.reflect.InvocationHandler;
import?java.lang.reflect.Method;
import?java.lang.reflect.Proxy;
import?java.lang.reflect.UndeclaredThrowableException;
public?final?class?Student$Proxy?extends?Proxy?implements?StudentJDKService?{
????private?static?Method?m1;
????private?static?Method?m2;
????private?static?Method?m4;
????private?static?Method?m3;
????private?static?Method?m0;
????public?Student$Proxy(InvocationHandler?paramInvocationHandler)?{
????????super(paramInvocationHandler);
????}
????public?final?boolean?equals(Object?paramObject)?{
????????try?{
????????????return?((Boolean)this.h.invoke(this,?m1,?new?Object[]?{?paramObject?})).booleanValue();
????????}?catch?(Error?|?RuntimeException?error)?{
????????????throw?null;
????????}?catch?(Throwable?throwable)?{
????????????throw?new?UndeclaredThrowableException(throwable);
????????}
????}
????public?final?String?toString()?{
????????try?{
????????????return?(String)this.h.invoke(this,?m2,?null);
????????}?catch?(Error?|?RuntimeException?error)?{
????????????throw?null;
????????}?catch?(Throwable?throwable)?{
????????????throw?new?UndeclaredThrowableException(throwable);
????????}
????}
????public?final?void?updateStudent(String?paramString)?{
????????try?{
????????????this.h.invoke(this,?m4,?new?Object[]?{?paramString?});
????????????return;
????????}?catch?(Error?|?RuntimeException?error)?{
????????????throw?null;
????????}?catch?(Throwable?throwable)?{
????????????throw?new?UndeclaredThrowableException(throwable);
????????}
????}
????public?final?void?addStudent(String?paramString)?{
????????try?{
????????????this.h.invoke(this,?m3,?new?Object[]?{?paramString?});
????????????return;
????????}?catch?(Error?|?RuntimeException?error)?{
????????????throw?null;
????????}?catch?(Throwable?throwable)?{
????????????throw?new?UndeclaredThrowableException(throwable);
????????}
????}
????public?final?int?hashCode()?{
????????try?{
????????????return?((Integer)this.h.invoke(this,?m0,?null)).intValue();
????????}?catch?(Error?|?RuntimeException?error)?{
????????????throw?null;
????????}?catch?(Throwable?throwable)?{
????????????throw?new?UndeclaredThrowableException(throwable);
????????}
????}
????static?{
????????try?{
????????????m1?=?Class.forName("java.lang.Object").getMethod("equals",?new?Class[]?{?Class.forName("java.lang.Object")?});
????????????m2?=?Class.forName("java.lang.Object").getMethod("toString",?new?Class[0]);
????????????m4?=?Class.forName("com.xpz.dubbo.simple.jdk.StudentJDKService").getMethod("updateStudent",?new?Class[]?{?Class.forName("java.lang.String")?});
????????????m3?=?Class.forName("com.xpz.dubbo.simple.jdk.StudentJDKService").getMethod("addStudent",?new?Class[]?{?Class.forName("java.lang.String")?});
????????????m0?=?Class.forName("java.lang.Object").getMethod("hashCode",?new?Class[0]);
????????????return;
????????}?catch?(NoSuchMethodException?noSuchMethodException)?{
????????????throw?new?NoSuchMethodError(noSuchMethodException.getMessage());
????????}?catch?(ClassNotFoundException?classNotFoundException)?{
????????????throw?new?NoClassDefFoundError(classNotFoundException.getMessage());
????????}
????}
}
2.2 DUBBO源碼應(yīng)用
那么在DUBBO源碼中動態(tài)代理是如何體現(xiàn)的呢?我們知道消費者在消費方法時實際上執(zhí)行的代理方法,這是消費者在refer時生成的代理方法。
代理工廠AbstractProxyFactory有兩個子類:
JdkProxyFactory
JavassistProxyFactory
通過下面源碼我們可以分析得到,DUBBO通過InvokerInvocationHandler對象代理了invoker對象:
public?class?JdkProxyFactory?extends?AbstractProxyFactory?{
????@Override
????public??T?getProxy(Invoker?invoker,?Class>[]?interfaces) ?{
????????return?(T)?Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),?interfaces,?new?InvokerInvocationHandler(invoker));
????}
}
public?class?JavassistProxyFactory?extends?AbstractProxyFactory?{
????@Override
????public??T?getProxy(Invoker?invoker,?Class>[]?interfaces) ?{
????????return?(T)?Proxy.getProxy(interfaces).newInstance(new?InvokerInvocationHandler(invoker));
????}
}
InvokerInvocationHandler將參數(shù)信息封裝至RpcInvocation進行傳遞:
public?class?InvokerInvocationHandler?implements?InvocationHandler?{
????private?final?Invoker>?invoker;
????public?InvokerInvocationHandler(Invoker>?handler)?{
????????this.invoker?=?handler;
????}
????@Override
????public?Object?invoke(Object?proxy,?Method?method,?Object[]?args)?throws?Throwable?{
????????String?methodName?=?method.getName();
????????Class>[]?parameterTypes?=?method.getParameterTypes();
????????if?(method.getDeclaringClass()?==?Object.class)?{
????????????return?method.invoke(invoker,?args);
????????}
????????if?("toString".equals(methodName)?&&?parameterTypes.length?==?0)?{
????????????return?invoker.toString();
????????}
????????if?("hashCode".equals(methodName)?&&?parameterTypes.length?==?0)?{
????????????return?invoker.hashCode();
????????}
????????if?("equals".equals(methodName)?&&?parameterTypes.length?==?1)?{
????????????return?invoker.equals(args[0]);
????????}
????????//?RpcInvocation?[methodName=sayHello,?parameterTypes=[class?java.lang.String],?arguments=[JAVA前線],?attachments={}]
????????RpcInvocation?rpcInvocation?=?createInvocation(method,?args);
????????return?invoker.invoke(rpcInvocation).recreate();
????}
????private?RpcInvocation?createInvocation(Method?method,?Object[]?args)?{
????????RpcInvocation?invocation?=?new?RpcInvocation(method,?args);
????????if?(RpcUtils.hasFutureReturnType(method))?{
????????????invocation.setAttachment(Constants.FUTURE_RETURNTYPE_KEY,?"true");
????????????invocation.setAttachment(Constants.ASYNC_KEY,?"true");
????????}
????????return?invocation;
????}
}
3 策略模式
在1995年出版的《設(shè)計模式:可復(fù)用面向?qū)ο筌浖幕A(chǔ)》給出了策略模式定義:
Define a family of algorithms, encapsulate each one, and make them interchangeable. Strategy lets the algorithm vary independently from clients that use it
定義一系列算法,封裝每一個算法,并使它們可以互換。策略模式可以使算法的變化獨立于使用它們的客戶端代碼。
在設(shè)計模式原則中有一條開閉原則:對擴展開放,對修改關(guān)閉,我認(rèn)為這是設(shè)計模式中最重要設(shè)計原則原因如下:
(1) 當(dāng)需求變化時應(yīng)該通過擴展而不是通過修改已有代碼來實現(xiàn)變化,這樣就保證代碼的穩(wěn)定性,避免牽一發(fā)而動全身
(2) 擴展也不是隨意擴展,因為事先定義了算法,擴展也是根據(jù)算法擴展,體現(xiàn)了用抽象構(gòu)建框架,用實現(xiàn)擴展細(xì)節(jié)
(3) 標(biāo)準(zhǔn)意義的二十三種設(shè)計模式說到底最終都是在遵循開閉原則
3.1 策略模式實例
假設(shè)我們現(xiàn)在需要解析一段文本,這段文本有可能是HTML也有可能是TEXT,如果不使用策略模式應(yīng)該怎么寫呢?
public?enum?DocTypeEnum?{
????HTML(1,?"HTML"),
????TEXT(2,?"TEXT");
????private?int?value;
????private?String?description;
????private?DocTypeEnum(int?value,?String?description)?{
????????this.value?=?value;
????????this.description?=?description;
????}
????
????public?int?value()?{??
????????return?value;??
????}????
}
public?class?ParserManager?{
????public?void?parse(Integer?docType,?String?content)?{
????????//?文本類型是HTML
????????if(docType?==?DocTypeEnum.HTML.getValue())?{
????????????//?解析邏輯
????????}
????????//?文本類型是TEXT
????????else?if(docType?==?DocTypeEnum.TEXT.getValue())?{
????????????//?解析邏輯
????????}
????}
}
這種寫法功能上沒有問題,但是當(dāng)本文類型越來越多時,那么parse方法將會越來越冗余和復(fù)雜,if else代碼塊也會越來越多,所以我們要使用策略模式。
第一步定義業(yè)務(wù)類型和業(yè)務(wù)實體:
public?enum?DocTypeEnum?{
????HTML(1,?"HTML"),
????TEXT(2,?"TEXT");
????private?int?value;
????private?String?description;
????private?DocTypeEnum(int?value,?String?description)?{
????????this.value?=?value;
????????this.description?=?description;
????}
????public?int?value()?{
????????return?value;
????}
}
public?class?BaseModel?{
????//?公共字段
}
public?class?HtmlContentModel?extends?BaseModel?{
????//?HTML自定義字段
}
public?class?TextContentModel?extends?BaseModel?{
????//?TEXT自定義字段
}
第二步定義策略:
public?interface?Strategy<T?extends?BaseModel>?{
????public?T?parse(String?sourceContent);
}
@Service
public?class?HtmlStrategy?implements?Strategy?{
????@Override
????public?HtmlContentModel?parse(String?sourceContent)?{
????????return?new?HtmlContentModel("html");
????}
}
@Service
public?class?TextStrategy?implements?Strategy?{
????@Override
????public?TextContentModel?parse(String?sourceContent)?{
????????return?new?TextContentModel("text");
????}
}
第三步定義策略工廠:
@Service
public?class?StrategyFactory?implements?InitializingBean?{
????
????private?Map?strategyMap?=?new?HashMap<>();??
????@Resource
????private?Strategy?htmlStrategy?;
????@Resource
????private?Strategy?textStrategy?;
????@Override
???public?void?afterPropertiesSet()?throws?Exception??{
???????strategyMap.put(RechargeTypeEnum.HTML.value(),?htmlStrategy);???
???????strategyMap.put(RechargeTypeEnum.TEXT.value(),textStrategy);
???}
???public?Strategy?getStrategy(int?type)?{
???????return?strategyMap.get(type);
???}
}?
第四步定義策略執(zhí)行器:
@Service
public?class?StrategyExecutor<T?extends?BaseModel>?{
????@Resource
????private?StrategyFactory?strategyFactory;
????public?T?parse(String?sourceContent,?Integer?type)?{
????????Strategy?strategy?=?StrategyFactory.getStrategy(type);
????????return?strategy.parse(sourceContent);
????}
}
第五步執(zhí)行測試用例:
public?class?Test?{
????@Resource
????private?StrategyExecutor??executor;
????@Test
????public?void?test()?{
????????//?解析HTML
????????HtmlContentModel?content1?=?(HtmlContentModel)?executor.parse("測試內(nèi)容",??DocTypeEnum.HTML.value());
????????System.out.println(content1);
????????//?解析TEXT
????????TextContentModel?content2?=?(TextContentModel)executor.calRecharge("測試內(nèi)容",??DocTypeEnum.TEXT.value());
????????System.out.println(content2);
????}
}
如果新增文本類型我們再擴展新策略即可。我們再回顧策略模式定義會有更深的體會:定義一系列算法,封裝每一個算法,并使它們可以互換。策略模式可以使算法的變化獨立于使用它們的客戶端代碼。
3.2 DUBBO源碼應(yīng)用
在上述實例中我們將策略存儲在map容器,我們思考一下還有沒有其它地方可以存儲策略?答案是配置文件。下面就要介紹SPI機制,我認(rèn)為這個機制在廣義上實現(xiàn)了策略模式。
SPI(Service Provider Interface)是一種服務(wù)發(fā)現(xiàn)機制,本質(zhì)是將接口實現(xiàn)類的全限定名配置在文件中,并由服務(wù)加載器讀取配置文件加載實現(xiàn)類,這樣可以在運行時動態(tài)為接口替換實現(xiàn)類,我們通過SPI機制可以為程序提供拓展功能。
3.2.1 JDK SPI
我們首先分析JDK自身SPI機制,定義一個數(shù)據(jù)驅(qū)動接口并提供兩個驅(qū)動實現(xiàn),最后通過serviceLoader加載驅(qū)動。
(1) 新建DataBaseDriver工程并定義接口
public?interface?DataBaseDriver?{
????String?connect(String?hostIp);
}
(2) 打包這個工程為JAR
<dependency>
??<groupId>com.javafont.spigroupId>
??<artifactId>DataBaseDriverartifactId>
??<version>1.0.0-SNAPSHOTversion>
dependency>
(3) 新建MySQLDriver工程引用上述依賴并實現(xiàn)DataBaseDriver接口
import?com.javafont.database.driver.DataBaseDriver;
public?class?MySQLDataBaseDriver?implements?DataBaseDriver?{
????@Override
????public?String?connect(String?hostIp)?{
????????return?"MySQL?DataBase?Driver?connect";
????}
}
(4) 在MySQLDriver項目新建文件
src/main/resources/META-INF/services/com.javafont.database.driver.DataBaseDriver
(5) 在上述文件新增如下內(nèi)容
com.javafont.database.mysql.driver.MySQLDataBaseDriver
(6) 按照上述相同步驟創(chuàng)建工程OracleDriver
(7) 打包上述兩個項目
<dependency>
??<groupId>com.javafont.spigroupId>
??<artifactId>MySQLDriverartifactId>
??<version>1.0.0-SNAPSHOTversion>
dependency>
<dependency>
??<groupId>com.javafont.spigroupId>
??<artifactId>OracleDriverartifactId>
??<version>1.0.0-SNAPSHOTversion>
dependency>
(8) 新建測試項目引入上述MySQLDriver、OracleDriver
public?class?DataBaseConnector?{
????public?static?void?main(String[]?args)?{
????????ServiceLoader?serviceLoader?=?ServiceLoader.load(DataBaseDriver.class);
????????Iterator?iterator?=?serviceLoader.iterator();
????????while?(iterator.hasNext())?{
????????????DataBaseDriver?driver?=?iterator.next();
????????????System.out.println(driver.connect("localhost"));
????????}
????}
}
//?輸出結(jié)果
//?MySQL?DataBase?Driver?connect
//?Oracle?DataBase?Driver?connect
我們并沒有指定使用哪個驅(qū)動連接數(shù)據(jù)庫,而是通過ServiceLoader方式加載所有實現(xiàn)了DataBaseDriver接口的實現(xiàn)類。假設(shè)我們只需要使用MySQL數(shù)據(jù)庫驅(qū)動那么直接引入相應(yīng)依賴即可。
3.2.2 DUBBO SPI
我們發(fā)現(xiàn)JDK SPI機制還是有一些不完善之處:例如通過ServiceLoader會加載所有實現(xiàn)了某個接口的實現(xiàn)類,但是不能通過一個key去指定獲取哪一個實現(xiàn)類,但是DUBBO自己實現(xiàn)的SPI機制解決了這個問題。
例如Protocol接口有如下實現(xiàn)類:
org.apache.dubbo.rpc.protocol.injvm.InjvmProtocol
org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol
我們現(xiàn)在將這些類配置信息在配置文件,配置文件在如下目錄:
META-INF/services/
META-INF/dubbo/
META-INF/dubbo/internal/
配置方式和JDK SPI方式配置不一樣,每個實現(xiàn)類都有key與之對應(yīng):
dubbo=org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol
injvm=org.apache.dubbo.rpc.protocol.injvm.InjvmProtocol
使用時通過擴展點方式加載實現(xiàn)類:
public?class?ReferenceConfig<T>?extends?AbstractReferenceConfig?{
????private?static?final?Protocol?refprotocol?=?ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
????private?T?createProxy(Map?map) ?{
????????if?(isJvmRefer)?{
????????????URL?url?=?new?URL(Constants.LOCAL_PROTOCOL,?Constants.LOCALHOST_VALUE,?0,?interfaceClass.getName()).addParameters(map);
????????????invoker?=?refprotocol.refer(interfaceClass,?url);
????????????if?(logger.isInfoEnabled())?{
????????????????logger.info("Using?injvm?service?"?+?interfaceClass.getName());
????????????}
????????}
????}
}
getAdaptiveExtension()是加載自適應(yīng)擴展點,javassist會為自適應(yīng)擴展點生成動態(tài)代碼:
public?class?Protocol$Adaptive?implements?org.apache.dubbo.rpc.Protocol?{
????public?org.apache.dubbo.rpc.Invoker?refer(java.lang.Class?arg0,?org.apache.dubbo.common.URL?arg1)?throws?org.apache.dubbo.rpc.RpcException?{
????????if?(arg1?==?null)
????????????throw?new?IllegalArgumentException("url?==?null");
????????org.apache.dubbo.common.URL?url?=?arg1;
????????String?extName?=?(url.getProtocol()?==?null???"dubbo"?:?url.getProtocol());
????????if?(extName?==?null)
????????????throw?new?IllegalStateException("Fail?to?get?extension(org.apache.dubbo.rpc.Protocol)?name?from?url("?+?url.toString()?+?")?use?keys([protocol])");
????????org.apache.dubbo.rpc.Protocol?extension?=?(org.apache.dubbo.rpc.Protocol)?ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
????????return?extension.refer(arg0,?arg1);
????}
}
extension對象就是根據(jù)url中protocol屬性等于injvm最終加載InjvmProtocol對象,動態(tài)獲取到了我們制定的業(yè)務(wù)對象,所以我認(rèn)為SPI體現(xiàn)了策略模式。
4 裝飾器模式
裝飾器模式可以動態(tài)將責(zé)任附加到對象上,在不改變原始類接口情況下,對原始類功能進行增強,并且支持多個裝飾器的嵌套使用。實現(xiàn)裝飾器模式需要以下組件:
(1) Component(抽象構(gòu)件)
核心業(yè)務(wù)抽象:可以使用接口或者抽象類
(2) ConcreteComponent(具體構(gòu)件)
實現(xiàn)核心業(yè)務(wù):最終執(zhí)行的業(yè)務(wù)代碼
(3) Decorator(抽象裝飾器)
抽象裝飾器類:實現(xiàn)Component并且組合一個Component對象
(4) ConcreteDecorator(具體裝飾器)
具體裝飾內(nèi)容:裝飾核心業(yè)務(wù)代碼
4.1 裝飾器實例
有一名足球運動員要去踢球,我們用球鞋和球襪為他裝飾一番,這樣可以使其戰(zhàn)力值增加,我們使用裝飾器模式實現(xiàn)這個實例。
(1) Component
/**
?*?抽象構(gòu)件(可以用接口替代)
?*/
public?abstract?class?Component?{
????/**
?????*?踢足球(業(yè)務(wù)核心方法)
?????*/
????public?abstract?void?playFootBall();
}
(2) ConcreteComponent
/**
?*?具體構(gòu)件
?*/
public?class?ConcreteComponent?extends?Component?{
????@Override
????public?void?playFootBall()?{
????????System.out.println("球員踢球");
????}
}
(3) Decorator
/**
?*?抽象裝飾器
?*/
public?abstract?class?Decorator?extends?Component?{
????private?Component?component?=?null;
????public?Decorator(Component?component)?{
????????this.component?=?component;
????}
????@Override
????public?void?playFootBall()?{
????????this.component.playFootBall();
????}
}
(4) ConcreteDecorator
/**
?*?球襪裝飾器
?*/
public?class?ConcreteDecoratorA?extends?Decorator?{
????public?ConcreteDecoratorA(Component?component)?{
????????super(component);
????}
????/**
?????*?定義球襪裝飾邏輯
?????*/
????private?void?decorateMethod()?{
????????System.out.println("換上球襪戰(zhàn)力值增加");
????}
????/**
?????*?重寫父類方法
?????*/
????@Override
????public?void?playFootBall()?{
????????this.decorateMethod();
????????super.playFootBall();
????}
}
/**
?*?球鞋裝飾器
?*/
public?class?ConcreteDecoratorB?extends?Decorator?{
????public?ConcreteDecoratorB(Component?component)?{
????????super(component);
????}
????/**
?????*?定義球鞋裝飾邏輯
?????*/
????private?void?decorateMethod()?{
????????System.out.println("換上球鞋戰(zhàn)力值增加");
????}
????/**
?????*?重寫父類方法
?????*/
????@Override
????public?void?playFootBall()?{
????????this.decorateMethod();
????????super.playFootBall();
????}
}
(5) 運行測試
public?class?TestDecoratorDemo?{
????public?static?void?main(String[]?args)?{
????????Component?component?=?new?ConcreteComponent();
????????component?=?new?ConcreteDecoratorA(component);
????????component?=?new?ConcreteDecoratorB(component);
????????component.playFootBall();
????}
}
//?換上球鞋戰(zhàn)力值增加
//?換上球襪戰(zhàn)力值增加
//?球員踢球
4.2 DUBBO源碼應(yīng)用
DUBBO是通過SPI機制實現(xiàn)裝飾器模式,我們以Protocol接口進行分析,首先分析裝飾器類,抽象裝飾器核心要點是實現(xiàn)了Component并且組合一個Component對象。
public?class?ProtocolFilterWrapper?implements?Protocol?{
????private?final?Protocol?protocol;
????public?ProtocolFilterWrapper(Protocol?protocol)?{
????????if?(protocol?==?null)?{
????????????throw?new?IllegalArgumentException("protocol?==?null");
????????}
????????this.protocol?=?protocol;
????}
}
public?class?ProtocolListenerWrapper?implements?Protocol?{
????private?final?Protocol?protocol;
????public?ProtocolListenerWrapper(Protocol?protocol)?{
????????if?(protocol?==?null)?{
????????????throw?new?IllegalArgumentException("protocol?==?null");
????????}
????????this.protocol?=?protocol;
????}
}
在配置文件中配置裝飾器:
filter=org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper
通過SPI機制加載擴展點時會使用裝飾器裝飾具體構(gòu)件:
public?class?ReferenceConfig<T>?extends?AbstractReferenceConfig?{
????private?static?final?Protocol?refprotocol?=?ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
????private?T?createProxy(Map?map) ?{
????????if?(isJvmRefer)?{
????????????URL?url?=?new?URL(Constants.LOCAL_PROTOCOL,?Constants.LOCALHOST_VALUE,?0,?interfaceClass.getName()).addParameters(map);
????????????invoker?=?refprotocol.refer(interfaceClass,?url);
????????????if?(logger.isInfoEnabled())?{
????????????????logger.info("Using?injvm?service?"?+?interfaceClass.getName());
????????????}
????????}
????}
}
最終生成refprotocol為如下對象:
ProtocolFilterWrapper(ProtocolListenerWrapper(InjvmProtocol))
5 責(zé)任鏈模式
責(zé)任鏈模式將請求發(fā)送和接收解耦,讓多個接收對象都有機會處理這個請求。這些接收對象串成一條鏈路并沿著這條鏈路傳遞這個請求,直到鏈路上某個接收對象能夠處理它。我們介紹責(zé)任鏈模式兩種應(yīng)用場景和四種代碼實現(xiàn)方式,最后介紹了DUBBO如何應(yīng)用責(zé)任鏈構(gòu)建過濾器鏈路。
5.1 應(yīng)用場景:命中立即中斷
實現(xiàn)一個關(guān)鍵詞過濾功能。系統(tǒng)設(shè)置三個關(guān)鍵詞過濾器,輸入內(nèi)容命中任何一個過濾器規(guī)則就返回校驗不通過,鏈路立即中斷無需繼續(xù)進行。
(1) 實現(xiàn)方式一
public?interface?ContentFilter?{
????public?boolean?filter(String?content);
}
public?class?AaaContentFilter?implements?ContentFilter?{
????private?final?static?String?KEY_CONTENT?=?"aaa";
????@Override
????public?boolean?filter(String?content)?{
????????boolean?isValid?=?Boolean.FALSE;
????????if?(StringUtils.isEmpty(content))?{
????????????return?isValid;
????????}
????????isValid?=?!content.contains(KEY_CONTENT);
????????return?isValid;
????}
}
public?class?BbbContentFilter?implements?ContentFilter?{
????private?final?static?String?KEY_CONTENT?=?"bbb";
????@Override
????public?boolean?filter(String?content)?{
????????boolean?isValid?=?Boolean.FALSE;
????????if?(StringUtils.isEmpty(content))?{
????????????return?isValid;
????????}
????????isValid?=?!content.contains(KEY_CONTENT);
????????return?isValid;
????}
}
public?class?CccContentFilter?implements?ContentFilter?{
????private?final?static?String?KEY_CONTENT?=?"ccc";
????@Override
????public?boolean?filter(String?content)?{
????????boolean?isValid?=?Boolean.FALSE;
????????if?(StringUtils.isEmpty(content))?{
????????????return?isValid;
????????}
????????isValid?=?!content.contains(KEY_CONTENT);
????????return?isValid;
????}
}
具體過濾器已經(jīng)完成,我們下面構(gòu)造過濾器責(zé)任鏈路:
@Service
public?class?ContentFilterChain?{
????private?List?filters?=?new?ArrayList();
????@PostConstruct
????public?void?init()?{
????????ContentFilter?aaaContentFilter?=?new?AaaContentFilter();
????????ContentFilter?bbbContentFilter?=?new?BbbContentFilter();
????????ContentFilter?cccContentFilter?=?new?CccContentFilter();
????????filters.add(aaaContentFilter);
????????filters.add(bbbContentFilter);
????????filters.add(cccContentFilter);
????}
????public?void?addFilter(ContentFilter?filter)?{
????????filters.add(filter);
????}
????public?boolean?filter(String?content)?{
????????if?(CollectionUtils.isEmpty(filters))?{
????????????throw?new?RuntimeException("ContentFilterChain?is?empty");
????????}
????????for?(ContentFilter?filter?:?filters)?{
????????????boolean?isValid?=?filter.filter(content);
????????????if?(!isValid)?{
????????????????System.out.println("校驗不通過");
????????????????return?isValid;
????????????}
????????}
????????return?Boolean.TRUE;
????}
}
public?class?Test?{
????public?static?void?main(String[]?args)?throws?Exception?{
????????ClassPathXmlApplicationContext?context?=?new?ClassPathXmlApplicationContext(new?String[]?{?"classpath*:META-INF/chain/spring-core.xml"?});
????????ContentFilterChain?chain?=?(ContentFilterChain)?context.getBean("contentFilterChain");
????????System.out.println(context);
????????boolean?result1?=?chain.filter("ccc");
????????boolean?result2?=?chain.filter("ddd");
????????System.out.println("校驗結(jié)果1="?+?result1);
????????System.out.println("校驗結(jié)果2="?+?result2);
????}
}
(2) 實現(xiàn)方式二
public?abstract?class?FilterHandler?{
????/**?下一個節(jié)點?**/
????protected?FilterHandler?successor?=?null;
????public?void?setSuccessor(FilterHandler?successor)?{
????????this.successor?=?successor;
????}
????public?final?boolean?filter(String?content)?{
????????/**?執(zhí)行自身方法?**/
????????boolean?isValid?=?doFilter(content);
????????if?(!isValid)?{
????????????System.out.println("校驗不通過");
????????????return?isValid;
????????}
????????/**?執(zhí)行下一個節(jié)點鏈路?**/
????????if?(successor?!=?null?&&?this?!=?successor)?{
????????????isValid?=?successor.filter(content);
????????}
????????return?isValid;
????}
????/**?每個節(jié)點過濾方法?**/
????protected?abstract?boolean?doFilter(String?content);
}
public?class?AaaContentFilterHandler?extends?FilterHandler?{
????private?final?static?String?KEY_CONTENT?=?"aaa";
????@Override
????protected?boolean?doFilter(String?content)?{
????????boolean?isValid?=?Boolean.FALSE;
????????if?(StringUtils.isEmpty(content))?{
????????????return?isValid;
????????}
????????isValid?=?!content.contains(KEY_CONTENT);
????????return?isValid;
????}
}
//?省略其它過濾器代碼
具體過濾器已經(jīng)完成,我們下面構(gòu)造過濾器責(zé)任鏈路:
@Service
public?class?FilterHandlerChain?{
????private?FilterHandler?head?=?null;
????private?FilterHandler?tail?=?null;
????@PostConstruct
????public?void?init()?{
????????FilterHandler?aaaHandler?=?new?AaaContentFilterHandler();
????????FilterHandler?bbbHandler?=?new?BbbContentFilterHandler();
????????FilterHandler?cccHandler?=?new?CccContentFilterHandler();
????????addHandler(aaaHandler);
????????addHandler(bbbHandler);
????????addHandler(cccHandler);
????}
????public?void?addHandler(FilterHandler?handler)?{
????????if?(head?==?null)?{
????????????head?=?tail?=?handler;
????????}
????????/**?設(shè)置當(dāng)前tail繼任者?**/
????????tail.setSuccessor(handler);
????????/**?指針重新指向tail?**/
????????tail?=?handler;
????}
????public?boolean?filter(String?content)?{
????????if?(null?==?head)?{
????????????throw?new?RuntimeException("FilterHandlerChain?is?empty");
????????}
????????/**?head發(fā)起調(diào)用?**/
????????return?head.filter(content);
????}
}
public?class?Test?{
????public?static?void?main(String[]?args)?throws?Exception?{
????????ClassPathXmlApplicationContext?context?=?new?ClassPathXmlApplicationContext(new?String[]?{?"classpath*:META-INF/chain/spring-core.xml"?});
????????FilterHandlerChain?chain?=?(FilterHandlerChain)?context.getBean("filterHandlerChain");
????????System.out.println(context);
????????boolean?result1?=?chain.filter("ccc");
????????boolean?result2?=?chain.filter("ddd");
????????System.out.println("校驗結(jié)果1="?+?result1);
????????System.out.println("校驗結(jié)果2="?+?result2);
????}
}
5.2 應(yīng)用場景:全鏈路執(zhí)行
我們實現(xiàn)一個考題生成功能。在線考試系統(tǒng)根據(jù)不同年級生成不同考題。系統(tǒng)設(shè)置三個考題生成器,每個生成器都會執(zhí)行,根據(jù)學(xué)生年級決定是否生成考題,無需生成則執(zhí)行下一個生成器。
(1) 實現(xiàn)方式一
public?interface?QuestionGenerator?{
????public?Question?generateQuestion(String?gradeInfo);
}
public?class?AaaQuestionGenerator?implements?QuestionGenerator?{
????@Override
????public?Question?generateQuestion(String?gradeInfo)?{
????????if?(!gradeInfo.equals("一年級"))?{
????????????return?null;
????????}
????????Question?question?=?new?Question();
????????question.setId("aaa");
????????question.setScore(10);
????????return?question;
????}
}
//?省略其它生成器代碼
具體生成器已經(jīng)編寫完成,我們下面構(gòu)造生成器責(zé)任鏈路:
@Service
public?class?QuestionChain?{
????private?List?generators?=?new?ArrayList();
????@PostConstruct
????public?void?init()?{
????????QuestionGenerator?aaaQuestionGenerator?=?new?AaaQuestionGenerator();
????????QuestionGenerator?bbbQuestionGenerator?=?new?BbbQuestionGenerator();
????????QuestionGenerator?cccQuestionGenerator?=?new?CccQuestionGenerator();
????????generators.add(aaaQuestionGenerator);
????????generators.add(bbbQuestionGenerator);
????????generators.add(cccQuestionGenerator);
????}
????public?List?generate(String?gradeInfo)? {
????????if?(CollectionUtils.isEmpty(generators))?{
????????????throw?new?RuntimeException("QuestionChain?is?empty");
????????}
????????List?questions?=?new?ArrayList();
????????for?(QuestionGenerator?generator?:?generators)?{
????????????Question?question?=?generator.generateQuestion(gradeInfo);
????????????if?(null?==?question)?{
????????????????continue;
????????????}
????????????questions.add(question);
????????}
????????return?questions;
????}
}
public?class?Test?{
????public?static?void?main(String[]?args)?{
????????ClassPathXmlApplicationContext?context?=?new?ClassPathXmlApplicationContext(new?String[]?{?"classpath*:META-INF/chain/spring-core.xml"?});
????????System.out.println(context);
????????QuestionChain?chain?=?(QuestionChain)?context.getBean("questionChain");
????????List?questions?=?chain.generate("一年級");
????????System.out.println(questions);
????}
}
(2) 實現(xiàn)方式二
public?abstract?class?GenerateHandler?{
????/**?下一個節(jié)點?**/
????protected?GenerateHandler?successor?=?null;
????public?void?setSuccessor(GenerateHandler?successor)?{
????????this.successor?=?successor;
????}
????public?final?List?generate(String?gradeInfo)? {
????????List?result?=?new?ArrayList();
????????/**?執(zhí)行自身方法?**/
????????Question?question?=?doGenerate(gradeInfo);
????????if?(null?!=?question)?{
????????????result.add(question);
????????}
????????/**?執(zhí)行下一個節(jié)點鏈路?**/
????????if?(successor?!=?null?&&?this?!=?successor)?{
????????????List?successorQuestions?=?successor.generate(gradeInfo);
????????????if?(null?!=?successorQuestions)?{
????????????????result.addAll(successorQuestions);
????????????}
????????}
????????return?result;
????}
????/**?每個節(jié)點生成方法?**/
????protected?abstract?Question?doGenerate(String?gradeInfo);
}
public?class?AaaGenerateHandler?extends?GenerateHandler?{
????@Override
????protected?Question?doGenerate(String?gradeInfo)?{
????????if?(!gradeInfo.equals("一年級"))?{
????????????return?null;
????????}
????????Question?question?=?new?Question();
????????question.setId("aaa");
????????question.setScore(10);
????????return?question;
????}
}
//?省略其它生成器代碼
具體生成器已經(jīng)完成,我們下面構(gòu)造生成器責(zé)任鏈路:
@Service
public?class?GenerateChain?{
????private?GenerateHandler?head?=?null;
????private?GenerateHandler?tail?=?null;
????@PostConstruct
????public?void?init()?{
????????GenerateHandler?aaaHandler?=?new?AaaGenerateHandler();
????????GenerateHandler?bbbHandler?=?new?BbbGenerateHandler();
????????GenerateHandler?cccHandler?=?new?CccGenerateHandler();
????????addHandler(aaaHandler);
????????addHandler(bbbHandler);
????????addHandler(cccHandler);
????}
????public?void?addHandler(GenerateHandler?handler)?{
????????if?(head?==?null)?{
????????????head?=?tail?=?handler;
????????}
????????/**?設(shè)置當(dāng)前tail繼任者?**/
????????tail.setSuccessor(handler);
????????/**?指針重新指向tail?**/
????????tail?=?handler;
????}
????public?List?generate(String?gradeInfo)? {
????????if?(null?==?head)?{
????????????throw?new?RuntimeException("GenerateChain?is?empty");
????????}
????????/**?head發(fā)起調(diào)用?**/
????????return?head.generate(gradeInfo);
????}
}
public?class?Test?{
????public?static?void?main(String[]?args)?{
????????ClassPathXmlApplicationContext?context?=?new?ClassPathXmlApplicationContext(new?String[]?{?"classpath*:META-INF/chain/spring-core.xml"?});
????????GenerateChain?chain?=?(GenerateChain)?context.getBean("generateChain");
????????System.out.println(context);
????????List?result?=?chain.generate("一年級");
????????System.out.println(result);
????}
}
5.3 DUBBO源碼應(yīng)用
生產(chǎn)者和消費者最終執(zhí)行對象都是過濾器鏈路最后一個節(jié)點,整個鏈路包含多個過濾器進行業(yè)務(wù)處理。我們看看生產(chǎn)者和消費者最終生成的過濾器鏈路。
生產(chǎn)者過濾器鏈路
EchoFilter?>?ClassloaderFilter?>?GenericFilter?>?ContextFilter?>?TraceFilter?>?TimeoutFilter?>?MonitorFilter?>?ExceptionFilter?>?AbstractProxyInvoker
消費者過濾器鏈路
ConsumerContextFilter?>?FutureFilter?>?MonitorFilter?>?DubboInvoker
ProtocolFilterWrapper作為鏈路生成核心通過匿名類方式構(gòu)建過濾器鏈路,我們以消費者構(gòu)建過濾器鏈路為例:
public?class?ProtocolFilterWrapper?implements?Protocol?{
????private?static??Invoker?buildInvokerChain(final?Invoker?invoker,?String?key,?String?group) ? {
????????//?invoker?=?DubboInvoker
????????Invoker?last?=?invoker;
????????//?查詢符合條件過濾器列表
????????List?filters?=?ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(),?key,?group);
????????if?(!filters.isEmpty())?{
????????????for?(int?i?=?filters.size()?-?1;?i?>=?0;?i--)?{
????????????????final?Filter?filter?=?filters.get(i);
????????????????final?Invoker?next?=?last;
????????????????//?構(gòu)造一個簡化Invoker
????????????????last?=?new?Invoker()?{
????????????????????@Override
????????????????????public?Class?getInterface()? {
????????????????????????return?invoker.getInterface();
????????????????????}
????????????????????@Override
????????????????????public?URL?getUrl()?{
????????????????????????return?invoker.getUrl();
????????????????????}
????????????????????@Override
????????????????????public?boolean?isAvailable()?{
????????????????????????return?invoker.isAvailable();
????????????????????}
????????????????????@Override
????????????????????public?Result?invoke(Invocation?invocation)?throws?RpcException?{
????????????????????????//?構(gòu)造過濾器鏈路
????????????????????????Result?result?=?filter.invoke(next,?invocation);
????????????????????????if?(result?instanceof?AsyncRpcResult)?{
????????????????????????????AsyncRpcResult?asyncResult?=?(AsyncRpcResult)?result;
????????????????????????????asyncResult.thenApplyWithContext(r?->?filter.onResponse(r,?invoker,?invocation));
????????????????????????????return?asyncResult;
????????????????????????}?else?{
????????????????????????????return?filter.onResponse(result,?invoker,?invocation);
????????????????????????}
????????????????????}
????????????????????@Override
????????????????????public?void?destroy()?{
????????????????????????invoker.destroy();
????????????????????}
????????????????????@Override
????????????????????public?String?toString()?{
????????????????????????return?invoker.toString();
????????????????????}
????????????????};
????????????}
????????}
????????return?last;
????}
????@Override
????public??Invoker?refer(Class?type,?URL?url) ?throws?RpcException? {
????????//?RegistryProtocol不構(gòu)造過濾器鏈路
????????if?(Constants.REGISTRY_PROTOCOL.equals(url.getProtocol()))?{
????????????return?protocol.refer(type,?url);
????????}
????????Invoker?invoker?=?protocol.refer(type,?url);
????????return?buildInvokerChain(invoker,?Constants.REFERENCE_FILTER_KEY,?Constants.CONSUMER);
????}
}
6 保護性暫停模式
在多線程編程實踐中我們肯定會面臨線程間數(shù)據(jù)交互的問題。在處理這類問題時需要使用一些設(shè)計模式,從而保證程序的正確性和健壯性。
保護性暫停設(shè)計模式就是解決多線程間數(shù)據(jù)交互問題的一種模式。本文先從基礎(chǔ)案例介紹保護性暫?;靖拍詈蛯嵺`,再由淺入深,最終分析DUBBO源碼中保護性暫停設(shè)計模式使用場景。
6.1 保護性暫停實例
我們設(shè)想這樣一種場景:線程A生產(chǎn)數(shù)據(jù),線程B讀取數(shù)據(jù)這個數(shù)據(jù)。
但是有一種情況:線程B準(zhǔn)備讀取數(shù)據(jù)時,此時線程A還沒有生產(chǎn)出數(shù)據(jù)。
在這種情況下線程B不能一直空轉(zhuǎn),也不能立即退出,線程B要等到生產(chǎn)數(shù)據(jù)完成并拿到數(shù)據(jù)之后才退出。
那么在數(shù)據(jù)沒有生產(chǎn)出這段時間,線程B需要執(zhí)行一種等待機制,這樣可以達(dá)到對系統(tǒng)保護目的,這就是保護性暫停。
保護性暫停有多種實現(xiàn)方式,本文我們用synchronized/wait/notify的方式實現(xiàn)。
class?Resource?{
????private?MyData?data;
????private?Object?lock?=?new?Object();
????public?MyData?getData(int?timeOut)?{
????????synchronized?(lock)?{
????????????//?運行時長
????????????long?timePassed?=?0;
????????????//?開始時間
????????????long?begin?=?System.currentTimeMillis();
????????????//?如果結(jié)果為空
????????????while?(data?==?null)?{
????????????????try?{
????????????????????//?如果運行時長大于超時時間退出循環(huán)
????????????????????if?(timePassed?>?timeOut)?{
????????????????????????break;
????????????????????}
????????????????????//?如果運行時長小于超時時間表示虛假喚醒?->?只需再等待時間差值
????????????????????long?waitTime?=?timeOut?-?timePassed;
????????????????????//?等待時間差值
????????????????????lock.wait(waitTime);
????????????????????//?結(jié)果不為空直接返回
????????????????????if?(data?!=?null)?{
????????????????????????break;
????????????????????}
????????????????????//?被喚醒后計算運行時長
????????????????????timePassed?=?System.currentTimeMillis()?-?begin;
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????}
????????????if?(data?==?null)?{
????????????????throw?new?RuntimeException("超時未獲取到結(jié)果");
????????????}
????????????return?data;
????????}
????}
????public?void?sendData(MyData?data)?{
????????synchronized?(lock)?{
????????????this.data?=?data;
????????????lock.notifyAll();
????????}
????}
}
/**
?*?保護性暫停實例
?*/
public?class?ProtectDesignTest?{
????public?static?void?main(String[]?args)?{
????????Resource?resource?=?new?Resource();
????????new?Thread(()?->?{
????????????try?{
????????????????MyData?data?=?new?MyData("hello");
????????????????System.out.println(Thread.currentThread().getName()?+?"生產(chǎn)數(shù)據(jù)="?+?data);
????????????????//?模擬發(fā)送耗時
????????????????TimeUnit.SECONDS.sleep(3);
????????????????resource.sendData(data);
????????????}?catch?(InterruptedException?e)?{
????????????????e.printStackTrace();
????????????}
????????},?"t1").start();
????????new?Thread(()?->?{
????????????MyData?data?=?resource.getData(1000);
????????????System.out.println(Thread.currentThread().getName()?+?"接收到數(shù)據(jù)="?+?data);
????????},?"t2").start();
????}
}
6.2 加一個編號
現(xiàn)在再來設(shè)想一個場景:現(xiàn)在有三個生產(chǎn)數(shù)據(jù)的線程1、2、3,三個獲取數(shù)據(jù)的線程4、5、6,我們希望每個獲取數(shù)據(jù)線程都只拿到其中一個生產(chǎn)線程的數(shù)據(jù),不能多拿也不能少拿。
這里引入一個Futures模型,這個模型為每個資源進行編號并存儲在容器中,例如線程1生產(chǎn)的數(shù)據(jù)被拿走則從容器中刪除,一直到容器為空結(jié)束。
@Getter
@Setter
public?class?MyNewData?implements?Serializable?{
????private?static?final?long?serialVersionUID?=?1L;
????private?static?final?AtomicLong?ID?=?new?AtomicLong(0);
????private?Long?id;
????private?String?message;
????public?MyNewData(String?message)?{
????????this.id?=?newId();
????????this.message?=?message;
????}
????/**
?????*?自增到最大值會回到最小值(負(fù)值可以作為識別ID)
?????*/
????private?static?long?newId()?{
????????return?ID.getAndIncrement();
????}
????public?Long?getId()?{
????????return?this.id;
????}
}
class?MyResource?{
????private?MyNewData?data;
????private?Object?lock?=?new?Object();
????public?MyNewData?getData(int?timeOut)?{
????????synchronized?(lock)?{
????????????long?timePassed?=?0;
????????????long?begin?=?System.currentTimeMillis();
????????????while?(data?==?null)?{
????????????????try?{
????????????????????if?(timePassed?>?timeOut)?{
????????????????????????break;
????????????????????}
????????????????????long?waitTime?=?timeOut?-?timePassed;
????????????????????lock.wait(waitTime);
????????????????????if?(data?!=?null)?{
????????????????????????break;
????????????????????}
????????????????????timePassed?=?System.currentTimeMillis()?-?begin;
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????}
????????????if?(data?==?null)?{
????????????????throw?new?RuntimeException("超時未獲取到結(jié)果");
????????????}
????????????return?data;
????????}
????}
????public?void?sendData(MyNewData?data)?{
????????synchronized?(lock)?{
????????????this.data?=?data;
????????????lock.notifyAll();
????????}
????}
}
class?MyFutures?{
????private?static?final?Map?FUTURES?=?new?ConcurrentHashMap<>();
????public?static?MyResource?newResource(MyNewData?data)?{
????????final?MyResource?future?=?new?MyResource();
????????FUTURES.put(data.getId(),?future);
????????return?future;
????}
????public?static?MyResource?getResource(Long?id)?{
????????return?FUTURES.remove(id);
????}
????public?static?Set?getIds()? {
????????return?FUTURES.keySet();
????}
}
/**
?*?保護性暫停實例
?*/
public?class?ProtectDesignTest?{
????public?static?void?main(String[]?args)?throws?Exception?{
????????for?(int?i?=?0;?i?3;?i++)?{
????????????final?int?index?=?i;
????????????new?Thread(()?->?{
????????????????try?{
????????????????????MyNewData?data?=?new?MyNewData("hello_"?+?index);
????????????????????MyResource?resource?=?MyFutures.newResource(data);
????????????????????//?模擬發(fā)送耗時
????????????????????TimeUnit.SECONDS.sleep(1);
????????????????????resource.sendData(data);
????????????????????System.out.println("生產(chǎn)數(shù)據(jù)data="?+?data);
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????}).start();
????????}
????????TimeUnit.SECONDS.sleep(1);
????????for?(Long?i?:?MyFutures.getIds())?{
????????????final?long?index?=?i;
????????????new?Thread(()?->?{
????????????????MyResource?resource?=?MyFutures.getResource(index);
????????????????int?timeOut?=?3000;
????????????????System.out.println("接收數(shù)據(jù)data="?+?resource.getData(timeOut));
????????????}).start();
????????}
????}
}
6.3 DUBBO源碼應(yīng)用
我們順著這一個鏈路跟蹤代碼:消費者發(fā)送請求 > 提供者接收請求并執(zhí)行,并且將運行結(jié)果發(fā)送給消費者 > 消費者接收結(jié)果。
(1) 消費者發(fā)送請求
消費者發(fā)送的數(shù)據(jù)包含請求ID,并且將關(guān)系維護進FUTURES容器
final?class?HeaderExchangeChannel?implements?ExchangeChannel?{
????@Override
????public?ResponseFuture?request(Object?request,?int?timeout)?throws?RemotingException?{
????????if?(closed)?{
????????????throw?new?RemotingException(this.getLocalAddress(),?null,?"Failed?to?send?request?"?+?request?+?",?cause:?The?channel?"?+?this?+?"?is?closed!");
????????}
????????Request?req?=?new?Request();
????????req.setVersion(Version.getProtocolVersion());
????????req.setTwoWay(true);
????????req.setData(request);
????????DefaultFuture?future?=?DefaultFuture.newFuture(channel,?req,?timeout);
????????try?{
????????????channel.send(req);
????????}?catch?(RemotingException?e)?{
????????????future.cancel();
????????????throw?e;
????????}
????????return?future;
????}
}
class?DefaultFuture?implements?ResponseFuture?{
????//?FUTURES容器
????private?static?final?Map?FUTURES?=?new?ConcurrentHashMap<>();
????private?DefaultFuture(Channel?channel,?Request?request,?int?timeout)?{
????????this.channel?=?channel;
????????this.request?=?request;
????????//?請求ID
????????this.id?=?request.getId();
????????this.timeout?=?timeout?>?0???timeout?:?channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY,?Constants.DEFAULT_TIMEOUT);
????????FUTURES.put(id,?this);
????????CHANNELS.put(id,?channel);
????}
}
(2) 提供者接收請求并執(zhí)行,并且將運行結(jié)果發(fā)送給消費者
public?class?HeaderExchangeHandler?implements?ChannelHandlerDelegate?{
????void?handleRequest(final?ExchangeChannel?channel,?Request?req)?throws?RemotingException?{
????????//?response與請求ID對應(yīng)
????????Response?res?=?new?Response(req.getId(),?req.getVersion());
????????if?(req.isBroken())?{
????????????Object?data?=?req.getData();
????????????String?msg;
????????????if?(data?==?null)?{
????????????????msg?=?null;
????????????}?else?if?(data?instanceof?Throwable)?{
????????????????msg?=?StringUtils.toString((Throwable)?data);
????????????}?else?{
????????????????msg?=?data.toString();
????????????}
????????????res.setErrorMessage("Fail?to?decode?request?due?to:?"?+?msg);
????????????res.setStatus(Response.BAD_REQUEST);
????????????channel.send(res);
????????????return;
????????}
????????//?message?=?RpcInvocation包含方法名、參數(shù)名、參數(shù)值等
????????Object?msg?=?req.getData();
????????try?{
????????????//?DubboProtocol.reply執(zhí)行實際業(yè)務(wù)方法
????????????CompletableFuture(3) 消費者接收結(jié)果
以下DUBBO源碼很好體現(xiàn)了保護性暫停這個設(shè)計模式,說明參看注釋
class?DefaultFuture?implements?ResponseFuture?{
????private?final?Lock?lock?=?new?ReentrantLock();
????private?final?Condition?done?=?lock.newCondition();
????public?static?void?received(Channel?channel,?Response?response)?{
????????try?{
????????????//?取出對應(yīng)的請求對象
????????????DefaultFuture?future?=?FUTURES.remove(response.getId());
????????????if?(future?!=?null)?{
????????????????future.doReceived(response);
????????????}?else?{
????????????????logger.warn("The?timeout?response?finally?returned?at?"
????????????????????????????+?(new?SimpleDateFormat("yyyy-MM-dd?HH:mm:ss.SSS").format(new?Date()))
????????????????????????????+?",?response?"?+?response
????????????????????????????+?(channel?==?null???""?:?",?channel:?"?+?channel.getLocalAddress()
???????????????????????????????+?"?->?"?+?channel.getRemoteAddress()));
????????????}
????????}?finally?{
????????????CHANNELS.remove(response.getId());
????????}
????}
????@Override
????public?Object?get(int?timeout)?throws?RemotingException?{
????????if?(timeout?<=?0)?{
????????????timeout?=?Constants.DEFAULT_TIMEOUT;
????????}
????????if?(!isDone())?{
????????????long?start?=?System.currentTimeMillis();
????????????lock.lock();
????????????try?{
????????????????while?(!isDone())?{
????????????????????//?放棄鎖并使當(dāng)前線程阻塞,直到發(fā)出信號中斷它或者達(dá)到超時時間
????????????????????done.await(timeout,?TimeUnit.MILLISECONDS);
????????????????????//?阻塞結(jié)束后再判斷是否完成
????????????????????if?(isDone())?{
????????????????????????break;
????????????????????}
????????????????????//?阻塞結(jié)束后判斷是否超時
????????????????????if(System.currentTimeMillis()?-?start?>?timeout)?{
????????????????????????break;
????????????????????}
????????????????}
????????????}?catch?(InterruptedException?e)?{
????????????????throw?new?RuntimeException(e);
????????????}?finally?{
????????????????lock.unlock();
????????????}
????????????//?response對象仍然為空則拋出超時異常
????????????if?(!isDone())?{
????????????????throw?new?TimeoutException(sent?>?0,?channel,?getTimeoutMessage(false));
????????????}
????????}
????????return?returnFromResponse();
????}
????private?void?doReceived(Response?res)?{
????????lock.lock();
????????try?{
????????????//?接收到服務(wù)器響應(yīng)賦值response
????????????response?=?res;
????????????if?(done?!=?null)?{
????????????????//?喚醒get方法中處于等待的代碼塊
????????????????done.signal();
????????????}
????????}?finally?{
????????????lock.unlock();
????????}
????????if?(callback?!=?null)?{
????????????invokeCallback(callback);
????????}
????}
}
7 雙重檢查鎖模式
單例設(shè)計模式可以保證在整個應(yīng)用某個類只能存在一個對象實例,并且這個類只提供一個取得其對象實例方法,通常這個對象創(chuàng)建和銷毀比較消耗資源,例如數(shù)據(jù)庫連接對象等等。我們分析一個雙重檢查鎖實現(xiàn)的單例模式實例。
public?class?MyDCLConnection?{
????private?static?volatile?MyDCLConnection?myConnection?=?null;
????private?MyDCLConnection()?{
????????System.out.println(Thread.currentThread().getName()?+?"?->?init?connection");
????}
????public?static?MyDCLConnection?getConnection()?{
????????if?(null?==?myConnection)?{
????????????synchronized?(MyDCLConnection.class)?{
????????????????if?(null?==?myConnection)?{
????????????????????myConnection?=?new?MyDCLConnection();
????????????????}
????????????}
????????}
????????return?myConnection;
????}
}
在DUBBO服務(wù)本地暴露時使用了雙重檢查鎖模式判斷exporter是否已經(jīng)存在避免重復(fù)創(chuàng)建:
public?class?RegistryProtocol?implements?Protocol?{
????private??ExporterChangeableWrapper?doLocalExport(final?Invoker?originInvoker,?URL?providerUrl) ? {
????????String?key?=?getCacheKey(originInvoker);
????????ExporterChangeableWrapper?exporter?=?(ExporterChangeableWrapper)?bounds.get(key);
????????if?(exporter?==?null)?{
????????????synchronized?(bounds)?{
????????????????exporter?=?(ExporterChangeableWrapper)?bounds.get(key);
????????????????if?(exporter?==?null)?{
????????????????????final?Invoker>?invokerDelegete?=?new?InvokerDelegate(originInvoker,?providerUrl);
????????????????????final?Exporter?strongExporter?=?(Exporter)?protocol.export(invokerDelegete);
????????????????????exporter?=?new?ExporterChangeableWrapper(strongExporter,?originInvoker);
????????????????????bounds.put(key,?exporter);
????????????????}
????????????}
????????}
????????return?exporter;
????}
}
8 文章總結(jié)
本文我們結(jié)合DUBBO源碼分析了模板方法模式、動態(tài)代理模式、策略模式、裝飾器模式、責(zé)任鏈模式、保護性暫停模式、雙重檢查鎖模式,我認(rèn)為在閱讀源碼時要學(xué)習(xí)其中優(yōu)秀的設(shè)計模式和代碼實例,這樣有助于提高代碼水平,希望本文對大家有所幫助。
JAVA前線?
歡迎大家關(guān)注公眾號「JAVA前線」查看更多精彩分享,主要包括源碼分析、實際應(yīng)用、架構(gòu)思維、職場分享、產(chǎn)品思考等等,同時也非常歡迎大家加我微信「java_front」一起交流學(xué)習(xí)
