1. Dubbo中重要的protocol【協(xié)議層】概述

        共 26713字,需瀏覽 54分鐘

         ·

        2021-04-17 14:32

        protocol【協(xié)議層】概述

        • 協(xié)議通常涉及到多方相互協(xié)作為實(shí)現(xiàn)某功能而遵守的規(guī)則

        • 常見(jiàn)通訊協(xié)議 如:網(wǎng)絡(luò)七層協(xié)議【OSI】、HTTP協(xié)議、TCP協(xié)議、UDP協(xié)議

        • 網(wǎng)絡(luò)七層協(xié)議【OSI】:應(yīng)用層、表示層、會(huì)話層、傳輸層、網(wǎng)絡(luò)層、數(shù)據(jù)鏈路層、物理層;規(guī)定了網(wǎng)絡(luò)通訊頂層大框架,只有接口【很抽象】

        • HTTP協(xié)議:大概會(huì)想到 應(yīng)用層協(xié)議、URL格式、httpheader、httpbody、get|post...【較具象】

        • TCP協(xié)議:大概會(huì)想到 傳輸層協(xié)議、三次握手/四次揮手、可靠性協(xié)議...【較具象】

        • 不論那種通訊協(xié)議都是指定了網(wǎng)絡(luò)通訊中某些【或全部】環(huán)節(jié)具體規(guī)則

        • dubbo框架有很多通訊協(xié)議可供選擇,不同通訊協(xié)議相當(dāng)于實(shí)現(xiàn)RPC功能的不同路徑

          • 將RPC功能類比成:“從A城市到B城市” 這么一件事情

          • 多種協(xié)議 可類比成 從A到B 有多種可選擇的交通工具

          • 選擇一種具體交通工具 就決定了:買什么票、司機(jī)是誰(shuí)、舒適度、交通線路 ...

          • dubbo的任何一種協(xié)議也規(guī)定【或默認(rèn)】了 序列化方式、【長(zhǎng)|短】鏈接、底層協(xié)議【http|tcp】、同步或異步、消息處理線程模型 ...

        dubbo中的Protocol

        • org.apache.dubbo.rpc.Protocol#export; provider 暴露invoker對(duì)象

        • 核心功能1: 啟動(dòng)xxxServer,將服務(wù)執(zhí)行對(duì)象【invoker】暴露在網(wǎng)絡(luò)中

        • org.apache.dubbo.rpc.Protocol#refer ; consumer 將URL轉(zhuǎn)換成invoker對(duì)象

        • 核心功能2: 創(chuàng)建xxxClient,封裝為調(diào)用對(duì)象【invoker】供consumer調(diào)用

        • Protocol 簡(jiǎn)圖

           

          1. // 默認(rèn)rpc層協(xié)議采用:DubboProtocol

          2. @SPI("dubbo")

          3. public interface Protocol {


          4. // provider 暴露服務(wù)階段調(diào)用

          5. // invoker:rpc接口實(shí)現(xiàn)類[impl]轉(zhuǎn)成invoker對(duì)象

          6. // 通過(guò)調(diào)用org.apache.dubbo.rpc.ProxyFactory#getInvoker進(jìn)行【impl與invoker】 轉(zhuǎn)換

          7. // provider方法執(zhí)行鏈:【.. -> Invoker#invoker -> Wrapper#invokeMethod -> Impl#xxx】

          8. //-------------------invoker 說(shuō)明 end------------------------

          9. // 例:http協(xié)議暴露服務(wù):?jiǎn)?dòng)http服務(wù)器,將對(duì)應(yīng)接口實(shí)現(xiàn)類暴露成http 服務(wù)

          10. //     服務(wù)暴露URL 例 : http://127.0.0.1:49178/org.apache.dubbo.rpc.protocol.http.HttpService?version=1.0.0

          11. // 例:dubbo協(xié)議暴露服務(wù):默認(rèn)啟動(dòng)netty server進(jìn)行

          12. @Adaptive

          13. <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;


          14. // consumer 引用服務(wù)階段執(zhí)行

          15. // 根據(jù)接口類型type,與遠(yuǎn)程服務(wù)url生成invoker對(duì)象

          16. // consumer方法調(diào)用鏈:rpcInterface#xxx -> proxy#xxx -> invoker#invoker -> nettyClient#send【默認(rèn)】

          17. @Adaptive

          18. <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;


          19. void destroy();

          20. 。。。

          21. }

        • AbstractProtocol 以及相關(guān)子類協(xié)議都在dubbo-rpc模塊下

        • DubboProtocol、RedisProtocol、MemcacheProtocol 直接繼承AbstractProtocol

        • 其他常見(jiàn)dubbo-rpc包下協(xié)議需要繼承AbstractProxyProtocol

        • RegistryProtocol 服務(wù)注冊(cè)監(jiān)聽(tīng)階段功能處理【本文暫不分析】

        • ProtocolFilterWrapper, ProtocolListenerWrapper 協(xié)議包裝【代理】類

        1. public abstract class AbstractProtocol implements Protocol {

        2. // 服務(wù)暴露map對(duì)象

        3. protected final Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<String, Exporter<?>>();


        4. @Override

        5. public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {

        6. // 異步轉(zhuǎn)同步invoker

        7. return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));

        8. }


        9. // 子類【服務(wù)引用】實(shí)現(xiàn)該方法

        10. protected abstract <T> Invoker<T> protocolBindingRefer(Class<T> type, URL url) throws RpcException;

        11. 。。。

        12. }

        • AbstractProtocol 中定義了服務(wù)暴露exporterMap屬性

        • 定義新服務(wù)引用抽象方法 :protocolBindingRefer

        dubbo-rpc模塊中協(xié)議分類

        代理協(xié)議

        • AbstractProxyProtocol

        • 核心功能:rpc接口代理類與invoker對(duì)象之間進(jìn)行轉(zhuǎn)換【閱讀完該類代碼后,發(fā)現(xiàn)類名稱很形象】

        • Protocol定義的export與refer方法對(duì)應(yīng)的參數(shù)與返回值 與 AbstractProxyProtocol子類對(duì)應(yīng) doExport, doRefer 參數(shù)與返回值類型不一樣;所以需要轉(zhuǎn)換

        • AbstractProxyProtocol 中的 export與refer,會(huì)調(diào)用子類的doExport 與 doRefer, 并進(jìn)行 proxy【接口代理類對(duì)象】與 invoker對(duì)象 之間轉(zhuǎn)換

        • 那么AbstractProxyProtocol 子類 就不能直接接收或返回與Protocol export或refer相同類型的參數(shù)或返回值么?或者自己進(jìn)行轉(zhuǎn)換?答案:不能直接接收或返回相同的參數(shù)或返回值【下文可以看到對(duì)應(yīng)子類實(shí)現(xiàn)】;子類可以在內(nèi)部轉(zhuǎn)換,但每個(gè)子類都需要轉(zhuǎn)換,所以相同功能可以抽到父類處理

        • 【DubboProtocol 服務(wù)暴露與引用方法中 比AbstractProxyProtocol 少了一次轉(zhuǎn)換】

        1. public abstract class AbstractProxyProtocol extends AbstractProtocol {


        2. private final List<Class<?>> rpcExceptions = new CopyOnWriteArrayList<Class<?>>();


        3. // 代理工廠

        4. protected ProxyFactory proxyFactory;


        5. public AbstractProxyProtocol() {

        6. }


        7. public AbstractProxyProtocol(Class<?>... exceptions) {

        8. for (Class<?> exception : exceptions) {

        9. addRpcException(exception);

        10. }

        11. }


        12. public ProxyFactory getProxyFactory() {

        13. return proxyFactory;

        14. }


        15. public void setProxyFactory(ProxyFactory proxyFactory) {

        16. this.proxyFactory = proxyFactory;

        17. }


        18. // 對(duì)應(yīng)子類實(shí)現(xiàn)該服務(wù)暴露方法時(shí) 需傳入rpc接口實(shí)現(xiàn)類impl【或?qū)崿F(xiàn)代理類】

        19. // 而Protocol接口方法是 :<T> Exporter<T> export(Invoker<T> invoker);

        20. // 所以需要該代理協(xié)議 實(shí)現(xiàn) invoker 與 代理實(shí)現(xiàn)類impl的轉(zhuǎn)換

        21. protected abstract <T> Runnable doExport(T impl, Class<T> type, URL url) throws RpcException;


        22. // 對(duì)應(yīng)子類實(shí)現(xiàn)該引用服務(wù)方法時(shí) 返回值類型是 :rpc接口代理類

        23. // 而Protocol接口方法:<T> Invoker<T> refer(Class<T> type, URL url);

        24. // 所以需要該代理協(xié)議 實(shí)現(xiàn) invoker 與 rpc接口代理類的轉(zhuǎn)換

        25. protected abstract <T> T doRefer(Class<T> type, URL url) throws RpcException;


        26. @Override

        27. @SuppressWarnings("unchecked")

        28. public <T> Exporter<T> export(final Invoker<T> invoker) throws RpcException {

        29. final String uri = serviceKey(invoker.getUrl());

        30. Exporter<T> exporter = (Exporter<T>) exporterMap.get(uri);

        31. if (exporter != null) {

        32. // When modifying the configuration through override, you need to re-expose the newly modified service.

        33. if (Objects.equals(exporter.getInvoker().getUrl(), invoker.getUrl())) {

        34. return exporter;

        35. }

        36. }

        37. // 服務(wù)暴露階段將invoker對(duì)象轉(zhuǎn)換成接口代理類對(duì)象進(jìn)行暴露

        38. // 【DubboProtocol沒(méi)有這一次轉(zhuǎn)換】

        39. final Runnable runnable = doExport(proxyFactory.getProxy(invoker, true), invoker.getInterface(), invoker.getUrl());

        40. exporter = new AbstractExporter<T>(invoker) {

        41. @Override

        42. public void unexport() {

        43. super.unexport();

        44. exporterMap.remove(uri);

        45. if (runnable != null) {

        46. try {

        47. runnable.run();

        48. } catch (Throwable t) {

        49. logger.warn(t.getMessage(), t);

        50. }

        51. }

        52. }

        53. };

        54. exporterMap.put(uri, exporter);

        55. return exporter;

        56. }


        57. @Override

        58. protected <T> Invoker<T> protocolBindingRefer(final Class<T> type, final URL url) throws RpcException {

        59. // 服務(wù)引用階段 多一次invoker 與 接口實(shí)現(xiàn)代理類的轉(zhuǎn)換

        60. // 【DubboProtocol沒(méi)有這一次轉(zhuǎn)換】

        61. final Invoker<T> target = proxyFactory.getInvoker(doRefer(type, url), type, url);

        62. Invoker<T> invoker = new AbstractInvoker<T>(type, url) {

        63. @Override

        64. protected Result doInvoke(Invocation invocation) throws Throwable {

        65. try {

        66. Result result = target.invoke(invocation);

        67. // FIXME result is an AsyncRpcResult instance.

        68. Throwable e = result.getException();

        69. if (e != null) {

        70. for (Class<?> rpcException : rpcExceptions) {

        71. if (rpcException.isAssignableFrom(e.getClass())) {

        72. throw getRpcException(type, url, invocation, e);

        73. }

        74. }

        75. }

        76. return result;

        77. } catch (RpcException e) {

        78. if (e.getCode() == RpcException.UNKNOWN_EXCEPTION) {

        79. e.setCode(getErrorCode(e.getCause()));

        80. }

        81. throw e;

        82. } catch (Throwable e) {

        83. throw getRpcException(type, url, invocation, e);

        84. }

        85. }

        86. };

        87. invokers.add(invoker);

        88. return invoker;

        89. }


        90. 。。。

        91. }

        功能分類

        單鏈接、長(zhǎng)鏈接,NIO異步傳輸,TCP

        • DubboProtocol 【非代理協(xié)議】

        • 服務(wù)暴露會(huì)調(diào)用Exchangers.bind(url, requestHandler);方法【指定requestHandler】

        • 服務(wù)引用會(huì)調(diào)用Exchangers.connect(url, requestHandler);方法【指定requestHandler】

        • requestHandler:請(qǐng)求處理handler,同時(shí)也處理回調(diào)請(qǐng)求【服務(wù)端回調(diào)客戶端】

        • dubbo中其他rpc模塊下的協(xié)議類均沒(méi)看到回調(diào)處理【均沒(méi)有回調(diào)處理】

        • 服務(wù)暴露與服務(wù)引用階段不需要?jiǎng)?chuàng)建rpc接口代理對(duì)象【與AbstractProxyProtocol子協(xié)議代碼結(jié)構(gòu)上的一個(gè)不同點(diǎn)】

        • Exchangers 內(nèi)部指定了【netty | mina | grizzly】java nio框架

        • 服務(wù)暴露使用使用nettyServer【默認(rèn)】, 服務(wù)引用可以采用minaClient【配置可指定 netty | mina | grizzly 三者之間選擇】

        • 默認(rèn)采用hessian2序列化方式,dubbo自定義數(shù)據(jù)包結(jié)構(gòu)

        • 默認(rèn)服務(wù)引用方創(chuàng)建一個(gè)長(zhǎng)鏈接

        1. public class DubboProtocol extends AbstractProtocol {


        2. // 請(qǐng)求處理handler

        3. // 回調(diào)請(qǐng)求也在該handler 處理

        4. private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {


        5. @Override

        6. public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {


        7. Invocation inv = (Invocation) message;

        8. Invoker<?> invoker = getInvoker(channel, inv);

        9. // 是否有回調(diào)CallBack 處理

        10. // need to consider backward-compatibility if it's a callback

        11. if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {

        12. String methodsStr = invoker.getUrl().getParameters().get("methods");

        13. 。。。

        14. }

        15. }

        16. RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());

        17. Result result = invoker.invoke(inv);

        18. return result.thenApply(Function.identity());

        19. }

        20. 。。。

        21. };


        22. // 直接將invoker對(duì)象進(jìn)行暴露

        23. // 沒(méi)有生成接口實(shí)現(xiàn)代理對(duì)象【與AbstractProxyProtocol不同】

        24. // 在openServer(url);方法中指定了requestHander進(jìn)行處理所有請(qǐng)求

        25. @Override

        26. public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {

        27. URL url = invoker.getUrl();


        28. // export service.

        29. String key = serviceKey(url);

        30. DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);

        31. exporterMap.put(key, exporter);


        32. //export an stub service for dispatching event

        33. Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);

        34. Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);

        35. if (isStubSupportEvent && !isCallbackservice) {

        36. String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);

        37. if (stubServiceMethods == null || stubServiceMethods.length() == 0) {

        38. if (logger.isWarnEnabled()) {

        39. logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +

        40. "], has set stubproxy support event ,but no stub methods founded."));

        41. }


        42. }

        43. }


        44. // 啟動(dòng)server

        45. openServer(url);

        46. optimizeSerialization(url);


        47. return exporter;

        48. }


        49. private void openServer(URL url) {

        50. // find server.

        51. String key = url.getAddress();

        52. //client can export a service which's only for server to invoke

        53. boolean isServer = url.getParameter(IS_SERVER_KEY, true);

        54. if (isServer) {

        55. // serverMap 為AbstractProtocol中屬性,

        56. // 可當(dāng)作服務(wù)server緩存

        57. // 并可保證接口暴露冪等性

        58. ProtocolServer server = serverMap.get(key);

        59. if (server == null) {

        60. synchronized (this) {

        61. server = serverMap.get(key);

        62. if (server == null) {

        63. serverMap.put(key, createServer(url));

        64. }

        65. }

        66. } else {

        67. // server supports reset, use together with override

        68. server.reset(url);

        69. }

        70. }

        71. }


        72. private ProtocolServer createServer(URL url) {

        73. url = URLBuilder.from(url)

        74. // send readonly event when server closes, it's enabled by default

        75. .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())

        76. // enable heartbeat by default

        77. .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))

        78. .addParameter(CODEC_KEY, DubboCodec.NAME)

        79. .build();

        80. String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);


        81. if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {

        82. throw new RpcException("Unsupported server type: " + str + ", url: " + url);

        83. }


        84. ExchangeServer server;

        85. try {

        86. // 指定 requestHandler 來(lái)接收請(qǐng)求消息處理器

        87. server = Exchangers.bind(url, requestHandler);

        88. } catch (RemotingException e) {

        89. throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);

        90. }


        91. str = url.getParameter(CLIENT_KEY);

        92. if (str != null && str.length() > 0) {

        93. Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();

        94. if (!supportedTypes.contains(str)) {

        95. throw new RpcException("Unsupported client type: " + str);

        96. }

        97. }


        98. return new DubboProtocolServer(server);

        99. }



        100. // 繼承AbstractProtocol 服務(wù)引用方法

        101. @Override

        102. public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {

        103. optimizeSerialization(url);


        104. // create rpc invoker.

        105. // 創(chuàng)建client

        106. DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);

        107. invokers.add(invoker);


        108. return invoker;

        109. }


        110. private ExchangeClient[] getClients(URL url) {

        111. boolean useShareConnect = false;

        112. int connections = url.getParameter(CONNECTIONS_KEY, 0);

        113. List<ReferenceCountExchangeClient> shareClients = null;

        114. // if not configured, connection is shared, otherwise, one connection for one service

        115. if (connections == 0) {

        116. useShareConnect = true;


        117. String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);

        118. connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,

        119. DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);

        120. // 通常引用一個(gè)服務(wù)只啟動(dòng)一個(gè)client【tcp長(zhǎng)鏈接】

        121. shareClients = getSharedClient(url, connections);

        122. }


        123. // 通過(guò)connections 參數(shù)控制啟動(dòng)多個(gè)client【tcp長(zhǎng)鏈接】

        124. ExchangeClient[] clients = new ExchangeClient[connections];

        125. for (int i = 0; i < clients.length; i++) {

        126. if (useShareConnect) {

        127. clients[i] = shareClients.get(i);


        128. } else {

        129. clients[i] = initClient(url);

        130. }

        131. }


        132. return clients;

        133. }


        134. private ExchangeClient initClient(URL url) {


        135. // client type setting.

        136. String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));


        137. url = url.addParameter(CODEC_KEY, DubboCodec.NAME);

        138. // enable heartbeat by default

        139. url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));


        140. // BIO is not allowed since it has severe performance issue.

        141. if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {

        142. throw new RpcException("Unsupported client type: " + str + "," +

        143. " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));

        144. }


        145. ExchangeClient client;

        146. try {

        147. // connection should be lazy

        148. if (url.getParameter(LAZY_CONNECT_KEY, false)) {

        149. client = new LazyConnectExchangeClient(url, requestHandler);


        150. } else {

        151. // 創(chuàng)建client也需要指定requestHandler

        152. // 此處的requestHandler 處理回調(diào)callback請(qǐng)求處理【provider回調(diào)consumer】

        153. client = Exchangers.connect(url, requestHandler);

        154. }


        155. } catch (RemotingException e) {

        156. throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);

        157. }


        158. return client;

        159. }

        160. 。。。。。

        161. }


        多鏈接、短鏈接,同步傳輸,HTTP | TCP

        • HttpProtocol、HessionProtocol、RestProtocol、RmiProtocol、WebServiceProtocol、XmlRpcProtocol【代理協(xié)議子類】

        • 每個(gè)協(xié)議類:創(chuàng)建server并啟動(dòng),創(chuàng)建client,指定請(qǐng)求處理器handler

        • server 與 client都采用開(kāi)源組件創(chuàng)建

        • 各代理子類創(chuàng)建server或client對(duì)象 都依賴到接口實(shí)現(xiàn)類【或接口代理實(shí)現(xiàn)類】

        • Protocol接口中export與refer方法需要傳入或返回invoker對(duì)象

        • AbstractProxyProtocol  中export 與 refer 方法有實(shí)現(xiàn) 接口代理實(shí)現(xiàn)類對(duì)象 與 invoker互轉(zhuǎn)

        • 采用通用的序列化方式【json 或 hessian】

        • RmiProtocol 采用TCP協(xié)議,其他協(xié)議都采用HTTP協(xié)議

        • dubbo 默認(rèn)采用jetty作為http web服務(wù)器

        • 無(wú)回調(diào)callback處理

        • 源碼示例

        1. public class HttpProtocol extends AbstractProxyProtocol {

        2. public static final String ACCESS_CONTROL_ALLOW_ORIGIN_HEADER = "Access-Control-Allow-Origin";

        3. public static final String ACCESS_CONTROL_ALLOW_METHODS_HEADER = "Access-Control-Allow-Methods";

        4. public static final String ACCESS_CONTROL_ALLOW_HEADERS_HEADER = "Access-Control-Allow-Headers";


        5. private final Map<String, JsonRpcServer> skeletonMap = new ConcurrentHashMap<>();


        6. private HttpBinder httpBinder;


        7. public HttpProtocol() {

        8. super(HttpException.class, JsonRpcClientException.class);

        9. }


        10. public void setHttpBinder(HttpBinder httpBinder) {

        11. this.httpBinder = httpBinder;

        12. }


        13. @Override

        14. public int getDefaultPort() {

        15. return 80;

        16. }


        17. // 消息處理器

        18. private class InternalHandler implements HttpHandler {


        19. private boolean cors;


        20. public InternalHandler(boolean cors) {

        21. this.cors = cors;

        22. }


        23. @Override

        24. public void handle(HttpServletRequest request, HttpServletResponse response)

        25. throws ServletException {

        26. String uri = request.getRequestURI();

        27. JsonRpcServer skeleton = skeletonMap.get(uri);

        28. if (cors) {

        29. response.setHeader(ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*");

        30. response.setHeader(ACCESS_CONTROL_ALLOW_METHODS_HEADER, "POST");

        31. response.setHeader(ACCESS_CONTROL_ALLOW_HEADERS_HEADER, "*");

        32. }

        33. if (request.getMethod().equalsIgnoreCase("OPTIONS")) {

        34. response.setStatus(200);

        35. } else if (request.getMethod().equalsIgnoreCase("POST")) {


        36. RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(), request.getRemotePort());

        37. try {

        38. // skeleton:com.googlecode.jsonrpc4j.JsonRpcServer

        39. // 采用JSON序列化

        40. skeleton.handle(request.getInputStream(), response.getOutputStream());

        41. } catch (Throwable e) {

        42. throw new ServletException(e);

        43. }

        44. } else {

        45. response.setStatus(500);

        46. }

        47. }


        48. }


        49. @Override

        50. protected <T> Runnable doExport(final T impl, Class<T> type, URL url) throws RpcException {

        51. String addr = getAddr(url);

        52. ProtocolServer protocolServer = serverMap.get(addr);

        53. if (protocolServer == null) {

        54. // 指定消息處理器handler

        55. RemotingServer remotingServer = httpBinder.bind(url, new InternalHandler(url.getParameter("cors", false)));

        56. serverMap.put(addr, new ProxyProtocolServer(remotingServer));

        57. }

        58. final String path = url.getAbsolutePath();

        59. final String genericPath = path + "/" + GENERIC_KEY;

        60. // 創(chuàng)建 server

        61. JsonRpcServer skeleton = new JsonRpcServer(impl, type);

        62. // 創(chuàng)建com.googlecode.jsonrpc4j.JsonRpcServer 需要傳入rpc接口實(shí)現(xiàn)【或代理類】對(duì)象

        63. JsonRpcServer genericServer = new JsonRpcServer(impl, GenericService.class);

        64. skeletonMap.put(path, skeleton);

        65. skeletonMap.put(genericPath, genericServer);

        66. return () -> {

        67. skeletonMap.remove(path);

        68. skeletonMap.remove(genericPath);

        69. };

        70. }


        71. @SuppressWarnings("unchecked")

        72. @Override

        73. protected <T> T doRefer(final Class<T> serviceType, URL url) throws RpcException {

        74. final String generic = url.getParameter(GENERIC_KEY);

        75. final boolean isGeneric = ProtocolUtils.isGeneric(generic) || serviceType.equals(GenericService.class);

        76. /********* com.googlecode.jsonrpc4j.spring.JsonProxyFactoryBean 創(chuàng)建代理對(duì)象過(guò)程【rpc-client】 **************/

        77. JsonProxyFactoryBean jsonProxyFactoryBean = new JsonProxyFactoryBean();

        78. JsonRpcProxyFactoryBean jsonRpcProxyFactoryBean = new JsonRpcProxyFactoryBean(jsonProxyFactoryBean);

        79. jsonRpcProxyFactoryBean.setRemoteInvocationFactory((methodInvocation) -> {

        80. RemoteInvocation invocation = new JsonRemoteInvocation(methodInvocation);

        81. if (isGeneric) {

        82. invocation.addAttribute(GENERIC_KEY, generic);

        83. }

        84. return invocation;

        85. });

        86. String key = url.setProtocol("http").toIdentityString();

        87. if (isGeneric) {

        88. key = key + "/" + GENERIC_KEY;

        89. }


        90. jsonRpcProxyFactoryBean.setServiceUrl(key);

        91. // 指定服務(wù)接口

        92. jsonRpcProxyFactoryBean.setServiceInterface(serviceType);


        93. jsonProxyFactoryBean.afterPropertiesSet();

        94. // 創(chuàng)建客戶端代理類,該代理類實(shí)現(xiàn)rpc接口,并返回

        95. // Protocol#refer 方法返回值類型為Invoker

        96. // AbstractProxyProtocol 會(huì)將proxy 轉(zhuǎn)成 Invoker 對(duì)象

        97. // 【DubboProtocol 服務(wù)引用方法沒(méi)有創(chuàng)建接口實(shí)現(xiàn)代理類】

        98. return (T) jsonProxyFactoryBean.getObject();

        99. /********* com.googlecode.jsonrpc4j.spring.JsonProxyFactoryBean 創(chuàng)建代理對(duì)象過(guò)程【rpc-client】 **************/

        100. }

        101. 。。。

        102. }

        •  HessianProtocol 源碼

        1. public class HessianProtocol extends AbstractProxyProtocol {


        2. private final Map<String, HessianSkeleton> skeletonMap = new ConcurrentHashMap<String, HessianSkeleton>();


        3. private HttpBinder httpBinder;


        4. public HessianProtocol() {

        5. super(HessianException.class);

        6. }


        7. public void setHttpBinder(HttpBinder httpBinder) {

        8. this.httpBinder = httpBinder;

        9. }


        10. @Override

        11. public int getDefaultPort() {

        12. return 80;

        13. }


        14. // 消息處理handler

        15. private class HessianHandler implements HttpHandler {


        16. @Override

        17. public void handle(HttpServletRequest request, HttpServletResponse response)

        18. throws IOException, ServletException {

        19. String uri = request.getRequestURI();

        20. // com.caucho.hessian.server.HessianSkeleton

        21. HessianSkeleton skeleton = skeletonMap.get(uri);

        22. if (!"POST".equalsIgnoreCase(request.getMethod())) {

        23. response.setStatus(500);

        24. } else {

        25. RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(), request.getRemotePort());


        26. Enumeration<String> enumeration = request.getHeaderNames();

        27. while (enumeration.hasMoreElements()) {

        28. String key = enumeration.nextElement();

        29. if (key.startsWith(DEFAULT_EXCHANGER)) {

        30. RpcContext.getContext().setAttachment(key.substring(DEFAULT_EXCHANGER.length()),

        31. request.getHeader(key));

        32. }

        33. }


        34. try {

        35. skeleton.invoke(request.getInputStream(), response.getOutputStream());

        36. } catch (Throwable e) {

        37. throw new ServletException(e);

        38. }

        39. }

        40. }


        41. }


        42. @Override

        43. protected <T> Runnable doExport(T impl, Class<T> type, URL url) throws RpcException {

        44. String addr = getAddr(url);

        45. ProtocolServer protocolServer = serverMap.get(addr);

        46. if (protocolServer == null) {

        47. // 指定消息處理handler

        48. // 創(chuàng)建server

        49. RemotingServer remotingServer = httpBinder.bind(url, new HessianHandler());

        50. serverMap.put(addr, new ProxyProtocolServer(remotingServer));

        51. }

        52. final String path = url.getAbsolutePath();

        53. // hessian框架服務(wù)提供方核型類:com.caucho.hessian.server.HessianSkeleton

        54. // 創(chuàng)建 com.caucho.hessian.server.HessianSkeleton 對(duì)需要傳入rpc接口實(shí)現(xiàn)【或代理類】對(duì)象

        55. final HessianSkeleton skeleton = new HessianSkeleton(impl, type);

        56. skeletonMap.put(path, skeleton);


        57. final String genericPath = path + "/" + GENERIC_KEY;

        58. skeletonMap.put(genericPath, new HessianSkeleton(impl, GenericService.class));


        59. return new Runnable() {

        60. @Override

        61. public void run() {

        62. skeletonMap.remove(path);

        63. skeletonMap.remove(genericPath);

        64. }

        65. };

        66. }


        67. @Override

        68. @SuppressWarnings("unchecked")

        69. protected <T> T doRefer(Class<T> serviceType, URL url) throws RpcException {

        70. String generic = url.getParameter(GENERIC_KEY);

        71. boolean isGeneric = ProtocolUtils.isGeneric(generic) || serviceType.equals(GenericService.class);

        72. if (isGeneric) {

        73. RpcContext.getContext().setAttachment(GENERIC_KEY, generic);

        74. url = url.setPath(url.getPath() + "/" + GENERIC_KEY);

        75. }

        76. /********* com.caucho.hessian.client.HessianProxyFactory 創(chuàng)建代理對(duì)象過(guò)程【rpc-client】 **************/

        77. HessianProxyFactory hessianProxyFactory = new HessianProxyFactory();

        78. boolean isHessian2Request = url.getParameter(HESSIAN2_REQUEST_KEY, DEFAULT_HESSIAN2_REQUEST);

        79. hessianProxyFactory.setHessian2Request(isHessian2Request);

        80. boolean isOverloadEnabled = url.getParameter(HESSIAN_OVERLOAD_METHOD_KEY, DEFAULT_HESSIAN_OVERLOAD_METHOD);

        81. hessianProxyFactory.setOverloadEnabled(isOverloadEnabled);

        82. String client = url.getParameter(CLIENT_KEY, DEFAULT_HTTP_CLIENT);

        83. if ("httpclient".equals(client)) {

        84. HessianConnectionFactory factory = new HttpClientConnectionFactory();

        85. factory.setHessianProxyFactory(hessianProxyFactory);

        86. hessianProxyFactory.setConnectionFactory(factory);

        87. } else if (client != null && client.length() > 0 && !DEFAULT_HTTP_CLIENT.equals(client)) {

        88. throw new IllegalStateException("Unsupported http protocol client=\"" + client + "\"!");

        89. } else {

        90. HessianConnectionFactory factory = new DubboHessianURLConnectionFactory();

        91. factory.setHessianProxyFactory(hessianProxyFactory);

        92. hessianProxyFactory.setConnectionFactory(factory);

        93. }

        94. int timeout = url.getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);

        95. hessianProxyFactory.setConnectTimeout(timeout);

        96. hessianProxyFactory.setReadTimeout(timeout);

        97. // 創(chuàng)建客戶端代理類,該代理類實(shí)現(xiàn)rpc接口,并返回

        98. // Protocol#refer 方法返回值類型為Invoker

        99. // AbstractProxyProtocol 會(huì)將proxy 轉(zhuǎn)成 Invoker 對(duì)象

        100. // 【DubboProtocol 服務(wù)引用方法沒(méi)有創(chuàng)建接口實(shí)現(xiàn)代理類】

        101. return (T) hessianProxyFactory.create(serviceType, url.setProtocol("http").toJavaURL(), Thread.currentThread().getContextClassLoader());

        102. /********* com.caucho.hessian.client.HessianProxyFactory 創(chuàng)建代理對(duì)象過(guò)程【rpc-client】 **************/

        103. }


        104. 。。。

        105. }

        只有服務(wù)引用,沒(méi)有服務(wù)暴露

        • RedisProtocol、MemcacheProtocol【非代理協(xié)議】

        • 都有第三方【緩存】服務(wù)器

        • consumer只能調(diào)用對(duì)應(yīng)get(k)、set(k,v)、delete(k) 三個(gè)方法, 與redis| memcache 交互

        • 該協(xié)議就是對(duì)緩存基礎(chǔ)方法的封裝,感覺(jué)實(shí)用價(jià)值不大

        • RedisProtocol 源碼【MemcacheProtocol代碼類似】

        1. public class RedisProtocol extends AbstractProtocol {


        2. public static final int DEFAULT_PORT = 6379;

        3. 。。。


        4. @Override

        5. protected <T> Invoker<T> protocolBindingRefer(final Class<T> type, final URL url) throws RpcException {

        6. try {

        7. GenericObjectPoolConfig config = new GenericObjectPoolConfig();

        8. config.setTestOnBorrow(url.getParameter("test.on.borrow", true));

        9. config.setTestOnReturn(url.getParameter("test.on.return", false));

        10. config.setTestWhileIdle(url.getParameter("test.while.idle", false));

        11. if (url.getParameter("max.idle", 0) > 0) {

        12. config.setMaxIdle(url.getParameter("max.idle", 0));

        13. }

        14. if (url.getParameter("min.idle", 0) > 0) {

        15. config.setMinIdle(url.getParameter("min.idle", 0));

        16. }

        17. if (url.getParameter("max.active", 0) > 0) {

        18. config.setMaxTotal(url.getParameter("max.active", 0));

        19. }

        20. if (url.getParameter("max.total", 0) > 0) {

        21. config.setMaxTotal(url.getParameter("max.total", 0));

        22. }

        23. if (url.getParameter("max.wait", 0) > 0) {

        24. config.setMaxWaitMillis(url.getParameter("max.wait", 0));

        25. }

        26. if (url.getParameter("num.tests.per.eviction.run", 0) > 0) {

        27. config.setNumTestsPerEvictionRun(url.getParameter("num.tests.per.eviction.run", 0));

        28. }

        29. if (url.getParameter("time.between.eviction.runs.millis", 0) > 0) {

        30. config.setTimeBetweenEvictionRunsMillis(url.getParameter("time.between.eviction.runs.millis", 0));

        31. }

        32. if (url.getParameter("min.evictable.idle.time.millis", 0) > 0) {

        33. config.setMinEvictableIdleTimeMillis(url.getParameter("min.evictable.idle.time.millis", 0));

        34. }

        35. final JedisPool jedisPool = new JedisPool(config, url.getHost(), url.getPort(DEFAULT_PORT),

        36. url.getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT),

        37. StringUtils.isBlank(url.getPassword()) ? null : url.getPassword(),

        38. url.getParameter("db.index", 0));

        39. final int expiry = url.getParameter("expiry", 0);

        40. // 可指定類似get的方法名

        41. final String get = url.getParameter("get", "get");

        42. // 可指定類似set的方法名

        43. final String set = url.getParameter("set", Map.class.equals(type) ? "put" : "set");

        44. // 可指定類似delete的方法名

        45. final String delete = url.getParameter("delete", Map.class.equals(type) ? "remove" : "delete");

        46. return new AbstractInvoker<T>(type, url) {

        47. @Override

        48. protected Result doInvoke(Invocation invocation) throws Throwable {

        49. Jedis jedis = null;

        50. try {

        51. jedis = jedisPool.getResource();


        52. if (get.equals(invocation.getMethodName())) {

        53. // 不是get(k) 則報(bào)錯(cuò)

        54. if (invocation.getArguments().length != 1) {

        55. throw new IllegalArgumentException("The redis get method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);

        56. }

        57. byte[] value = jedis.get(String.valueOf(invocation.getArguments()[0]).getBytes());

        58. if (value == null) {

        59. return AsyncRpcResult.newDefaultAsyncResult(invocation);

        60. }

        61. ObjectInput oin = getSerialization(url).deserialize(url, new ByteArrayInputStream(value));

        62. return AsyncRpcResult.newDefaultAsyncResult(oin.readObject(), invocation);

        63. } else if (set.equals(invocation.getMethodName())) {

        64. // 不是set(k, v) 則報(bào)錯(cuò)

        65. if (invocation.getArguments().length != 2) {

        66. throw new IllegalArgumentException("The redis set method arguments mismatch, must be two arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);

        67. }

        68. byte[] key = String.valueOf(invocation.getArguments()[0]).getBytes();

        69. ByteArrayOutputStream output = new ByteArrayOutputStream();

        70. ObjectOutput value = getSerialization(url).serialize(url, output);

        71. value.writeObject(invocation.getArguments()[1]);

        72. jedis.set(key, output.toByteArray());

        73. if (expiry > 1000) {

        74. jedis.expire(key, expiry / 1000);

        75. }

        76. return AsyncRpcResult.newDefaultAsyncResult(invocation);

        77. } else if (delete.equals(invocation.getMethodName())) {

        78. // 不是delete(k) 則報(bào)錯(cuò)

        79. if (invocation.getArguments().length != 1) {

        80. throw new IllegalArgumentException("The redis delete method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);

        81. }

        82. jedis.del(String.valueOf(invocation.getArguments()[0]).getBytes());

        83. return AsyncRpcResult.newDefaultAsyncResult(invocation);

        84. } else {

        85. // 其余方法一律不支持

        86. throw new UnsupportedOperationException("Unsupported method " + invocation.getMethodName() + " in redis service.");

        87. }

        88. } catch (Throwable t) {

        89. RpcException re = new RpcException("Failed to invoke redis service method. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url + ", cause: " + t.getMessage(), t);

        90. if (t instanceof TimeoutException || t instanceof SocketTimeoutException) {

        91. re.setCode(RpcException.TIMEOUT_EXCEPTION);

        92. } else if (t instanceof JedisConnectionException || t instanceof IOException) {

        93. re.setCode(RpcException.NETWORK_EXCEPTION);

        94. } else if (t instanceof JedisDataException) {

        95. re.setCode(RpcException.SERIALIZATION_EXCEPTION);

        96. }

        97. throw re;

        98. } finally {

        99. if (jedis != null) {

        100. try {

        101. jedis.close();

        102. } catch (Throwable t) {

        103. logger.warn("returnResource error: " + t.getMessage(), t);

        104. }

        105. }

        106. }

        107. }


        108. @Override

        109. public void destroy() {

        110. super.destroy();

        111. try {

        112. jedisPool.destroy();

        113. } catch (Throwable e) {

        114. logger.warn(e.getMessage(), e);

        115. }

        116. }

        117. };

        118. } catch (Throwable t) {

        119. throw new RpcException("Failed to refer redis service. interface: " + type.getName() + ", url: " + url + ", cause: " + t.getMessage(), t);

        120. }

        121. }

        122. }

        擴(kuò)展第三方RPC服務(wù)協(xié)議

        • GrpcProtocol,ThriftProtocol【代理協(xié)議子類】

        • 【暫時(shí)沒(méi)研究,貼一些概念】

        • gRPC 是一個(gè)高性能、開(kāi)源和通用的 RPC 框架,面向移動(dòng)和 HTTP/2 設(shè)計(jì), ThriftProtocol

        • Thrift是一種接口描述語(yǔ)言和二進(jìn)制通訊協(xié)議,它被用來(lái)定義和創(chuàng)建跨語(yǔ)言的服務(wù)。它被當(dāng)作一個(gè)遠(yuǎn)程過(guò)程調(diào)用(RPC)框架來(lái)使用,是由Facebook為“大規(guī)??缯Z(yǔ)言服務(wù)開(kāi)發(fā)”而開(kāi)發(fā)的。

        非RPC層協(xié)議

        • QosProtocolWrapper:服務(wù)健康檢查、質(zhì)量探測(cè) 會(huì)用到該協(xié)議

        • RegistryProtocol:注冊(cè)階段使用,在服務(wù)注冊(cè)文章中重點(diǎn)分析

        • ProtocolFilterWrapper:filter協(xié)議包裝類, 可參考dubbo filter加載,extension Wrapper擴(kuò)展


        瀏覽 170
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
          
          

            1. 欧美人妻三级 | 精品视频一区二区三区四区乐趣播 | 国产成人a亚洲精品无码电影 | 成人三区 | 无码一区.一区 |