1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        100行代碼透徹解析RPC原理

        共 18550字,需瀏覽 38分鐘

         ·

        2022-01-17 01:39

        本文主要論述的是“RPC 實現(xiàn)原理”,那么首先明確一個問題什么是 RPC 呢?RPC 是 Remote Procedure Call 的縮寫,即,遠程過程調(diào)用。RPC 是一個計算機通信協(xié)議。該協(xié)議允許運行于一臺計算機的程序調(diào)用另一臺計算機的子程序,而開發(fā)人員無需額外地為這個交互編程。

        值得注意是,兩個或多個應用程序都分布在不同的服務器上,它們之間的調(diào)用都像是本地方法調(diào)用一樣。接下來我們便來分析一下一次 RPC 調(diào)用發(fā)生了些什么?


        ?
        1?
        RPC 調(diào)用的基本流程


        現(xiàn)在業(yè)界內(nèi)比較流行的一些 RPC 框架,例如 Dubbo 提供的是基于接口的遠程方法調(diào)用,即客戶端只需要知道接口的定義即可調(diào)用遠程服務。在 Java 中接口并不能直接調(diào)用實例方法,必須通過其實現(xiàn)類對象來完成此操作,這意味著客戶端必須為這些接口生成代理對象,對此 Java 提供了 Proxy、InvocationHandler 生成動態(tài)代理的支持;生成了代理對象,那么每個具體的發(fā)方法是怎么調(diào)用的呢?JDK 動態(tài)代理生成的代理對象調(diào)用指定方法時實際會執(zhí)行 InvocationHandler 中定義的?#invoke 方法,在該方法中完成遠程方法調(diào)用并獲取結(jié)果。

        拋開客戶端,回過頭來看 RPC 是兩臺計算機間的調(diào)用,實質(zhì)上是兩臺主機間的網(wǎng)絡通信,涉及到網(wǎng)絡通信又必然會有序列化、反序列化,編解碼等一些必須要考慮的問題;同時實際上現(xiàn)在大多系統(tǒng)都是集群部署的,多臺主機/容器對外提供相同的服務,如果集群的節(jié)點數(shù)量很大的話,那么管理服務地址也將是一件十分繁瑣的事情,常見的做法是各個服務節(jié)點將自己的地址和提供的服務列表注冊到一個注冊中心,由注冊中心來統(tǒng)一管理服務列表;這樣的做法解決了一些問題同時為客戶端增加了一項新的工作——那就是服務發(fā)現(xiàn),通俗來說就是從注冊中心中找到遠程方法對應的服務列表并通過某種策略從中選取一個服務地址來完成網(wǎng)絡通信。

        聊了客戶端和注冊中心,另外一個重要的角色自然是服務端,服務端最重要的任務便是提供服務接口的真正實現(xiàn)并在某個端口上監(jiān)聽網(wǎng)絡請求,監(jiān)聽到請求后從網(wǎng)絡請求中獲取到對應的參數(shù)(比如服務接口、方法、請求參數(shù)等),再根據(jù)這些參數(shù)通過反射的方式調(diào)用接口的真正實現(xiàn)獲取結(jié)果并將其寫入對應的響應流中。

        綜上所述,一次基本的 RPC 調(diào)用流程大致如下:


        ?
        2?
        基本實現(xiàn)


        服務端(生產(chǎn)者)

        服務接口

        在 RPC 中,生產(chǎn)者和消費者有一個共同的服務接口 API。如下,定義一個 HelloService 接口。

        /**
        ?*?@author?孫浩
        ?*?@Descrption??服務接口
        ?***/
        public?interface?HelloService?{
        ????String?sayHello(String?somebody);
        }

        服務實現(xiàn)

        生產(chǎn)者要提供服務接口的實現(xiàn),創(chuàng)建 HelloServiceImpl 實現(xiàn)類。

        /**
        ?*?@author?孫浩
        ?*?@Descrption?服務實現(xiàn)
        ?***/
        public?class?HelloServiceImpl?implements?HelloService?{
        ????@Override
        ????public?String?sayHello(String?somebody)?{
        ????????return?"hello?"?+?somebody?+?"!";
        ????}
        }

        服務注冊

        本例使用 Spring 來管理 bean,采用自定義 XML 和解析器的方式來將服務實現(xiàn)類載入容器(當然也可以采用自定義注解的方式,此處不過多論述)并將服務接口信息注冊到注冊中心。

        首先自定義 XSD:

        "service">
        ????
        ????????
        ????????????"beans:identifiedType">
        ????????????????"interface"?type="xsd:string"?use="required"/>
        ????????????????"timeout"?type="xsd:int"?use="required"/>
        ????????????????"serverPort"?type="xsd:int"?use="required"/>
        ????????????????"ref"?type="xsd:string"?use="required"/>
        ????????????????"weight"?type="xsd:int"?use="optional"/>
        ????????????????"workerThreads"?type="xsd:int"?use="optional"/>
        ????????????????"appKey"?type="xsd:string"?use="required"/>
        ????????????????"groupName"?type="xsd:string"?use="optional"/>
        ????????????
        ????????

        ????

        分別指定 Schema 和 XSD,Schema 和對應 Handler 的映射。

        Schema:

        http\://www.storm.com/schema/storm-service.xsd=META-INF/storm-service.xsd
        http\://www.storm.com/schema/storm-reference.xsd=META-INF/storm-reference.xsd

        Handler:

        http\://www.storm.com/schema/storm-service=com.hsunfkqm.storm.framework.spring.StormServiceNamespaceHandler
        http\://www.storm.com/schema/storm-reference=com.hsunfkqm.storm.framework.spring.StormRemoteReferenceNamespaceHandler

        將編寫好的文件放入 Classpath 下的 META-INF 目錄下:


        在 Spring 配置文件中配置服務類:


        ?"helloService"?class="com.hsunfkqm.storm.framework.test.HelloServiceImpl"/>
        ?"helloServiceRegister"
        ?????????????????????interface="com.hsunfkqm.storm.framework.test.HelloService"
        ?????????????????????ref="helloService"
        ?????????????????????groupName="default"
        ?????????????????????weight="2"
        ?????????????????????appKey="ares"
        ?????????????????????workerThreads="100"
        ?????????????????????serverPort="8081"
        ?????????????????????timeout="600"/>

        編寫對應的 Handler 和 Parser:

        StormServiceNamespaceHandler:

        import?org.springframework.beans.factory.xml.NamespaceHandlerSupport;

        /**
        ?*?@author?孫浩
        ?*?@Descrption?服務發(fā)布自定義標簽
        ?***/
        public?class?StormServiceNamespaceHandler?extends?NamespaceHandlerSupport?{
        ????@Override
        ????public?void?init()?{
        ????????registerBeanDefinitionParser("service",?new?ProviderFactoryBeanDefinitionParser());
        ????}
        }

        ProviderFactoryBeanDefinitionParser:

        protected?Class?getBeanClass(Element?element)?{
        ????????return?ProviderFactoryBean.class;
        ????}

        ????protected?void?doParse(Element?element,?BeanDefinitionBuilder?bean)?{

        ????????try?{
        ????????????String?serviceItf?=?element.getAttribute("interface");
        ????????????String?serverPort?=?element.getAttribute("serverPort");
        ????????????String?ref?=?element.getAttribute("ref");
        ????????????//?....
        ????????????bean.addPropertyValue("serverPort",?Integer.parseInt(serverPort));
        ????????????bean.addPropertyValue("serviceItf",?Class.forName(serviceItf));
        ????????????bean.addPropertyReference("serviceObject",?ref);
        ????????????//...
        ????????????if?(NumberUtils.isNumber(weight))?{
        ????????????????bean.addPropertyValue("weight",?Integer.parseInt(weight));
        ????????????}
        ????????????//...
        ???????}?catch?(Exception?e)?{
        ????????????//?...????????
        ??????}
        ????}

        ProviderFactoryBean:

        /**
        ?*?@author?孫浩
        ?*?@Descrption?服務發(fā)布
        ?***/
        public?class?ProviderFactoryBean?implements?FactoryBean,?InitializingBean?{

        ????//服務接口
        ????private?Class?serviceItf;
        ????//服務實現(xiàn)
        ????private?Object?serviceObject;
        ????//服務端口
        ????private?String?serverPort;
        ????//服務超時時間
        ????private?long?timeout;
        ????//服務代理對象,暫時沒有用到
        ????private?Object?serviceProxyObject;
        ????//服務提供者唯一標識
        ????private?String?appKey;
        ????//服務分組組名
        ????private?String?groupName?=?"default";
        ????//服務提供者權(quán)重,默認為?1?,?范圍為?[1-100]
        ????private?int?weight?=?1;
        ????//服務端線程數(shù),默認?10?個線程
        ????private?int?workerThreads?=?10;

        ????@Override
        ????public?Object?getObject()?throws?Exception?{
        ????????return?serviceProxyObject;
        ????}

        ????@Override
        ????public?Class?getObjectType()?{
        ????????return?serviceItf;
        ????}

        ????@Override
        ????public?void?afterPropertiesSet()?throws?Exception?{
        ????????//啟動?Netty?服務端
        ????????NettyServer.singleton().start(Integer.parseInt(serverPort));
        ????????//注冊到?zk,?元數(shù)據(jù)注冊中心
        ????????List?providerServiceList?=?buildProviderServiceInfos();
        ????????IRegisterCenter4Provider?registerCenter4Provider?=?RegisterCenter.singleton();
        ????????registerCenter4Provider.registerProvider(providerServiceList);
        ????}
        }

        //================RegisterCenter#registerProvider======================
        @Override
        public?void?registerProvider(final?List?serviceMetaData)?{
        ????if?(CollectionUtils.isEmpty(serviceMetaData))?{
        ????????return;
        ????}

        ????//連接?zk,?注冊服務
        ????synchronized?(RegisterCenter.class)?{
        ????????for?(ProviderService?provider?:?serviceMetaData)?{
        ????????????String?serviceItfKey?=?provider.getServiceItf().getName();

        ????????????List?providers?=?providerServiceMap.get(serviceItfKey);
        ????????????if?(providers?==?null)?{
        ????????????????providers?=?Lists.newArrayList();
        ????????????}
        ????????????providers.add(provider);
        ????????????providerServiceMap.put(serviceItfKey,?providers);
        ????????}

        ????????if?(zkClient?==?null)?{
        ????????????zkClient?=?new?ZkClient(ZK_SERVICE,?ZK_SESSION_TIME_OUT,?ZK_CONNECTION_TIME_OUT,?new?SerializableSerializer());
        ????????}

        ????????//創(chuàng)建?ZK?命名空間/當前部署應用?APP?命名空間/
        ????????String?APP_KEY?=?serviceMetaData.get(0).getAppKey();
        ????????String?ZK_PATH?=?ROOT_PATH?+?"/"?+?APP_KEY;
        ????????boolean?exist?=?zkClient.exists(ZK_PATH);
        ????????if?(!exist)?{
        ????????????zkClient.createPersistent(ZK_PATH,?true);
        ????????}

        ????????for?(Map.Entry>?entry?:?providerServiceMap.entrySet())?{
        ????????????//服務分組
        ????????????String?groupName?=?entry.getValue().get(0).getGroupName();
        ????????????//創(chuàng)建服務提供者
        ????????????String?serviceNode?=?entry.getKey();
        ????????????String?servicePath?=?ZK_PATH?+?"/"?+?groupName?+?"/"?+?serviceNode?+?"/"?+?PROVIDER_TYPE;
        ????????????exist?=?zkClient.exists(servicePath);
        ????????????if?(!exist)?{
        ????????????????zkClient.createPersistent(servicePath,?true);
        ????????????}

        ????????????//創(chuàng)建當前服務器節(jié)點
        ????????????int?serverPort?=?entry.getValue().get(0).getServerPort();//服務端口
        ????????????int?weight?=?entry.getValue().get(0).getWeight();//服務權(quán)重
        ????????????int?workerThreads?=?entry.getValue().get(0).getWorkerThreads();//服務工作線程
        ????????????String?localIp?=?IPHelper.localIp();
        ????????????String?currentServiceIpNode?=?servicePath?+?"/"?+?localIp?+?"|"?+?serverPort?+?"|"?+?weight?+?"|"?+?workerThreads?+?"|"?+?groupName;
        ????????????exist?=?zkClient.exists(currentServiceIpNode);
        ????????????if?(!exist)?{
        ????????????????//注意,這里創(chuàng)建的是臨時節(jié)點
        ????????????????zkClient.createEphemeral(currentServiceIpNode);
        ????????????}
        ????????????//監(jiān)聽注冊服務的變化,同時更新數(shù)據(jù)到本地緩存
        ????????????zkClient.subscribeChildChanges(servicePath,?new?IZkChildListener()?{
        ????????????????@Override
        ????????????????public?void?handleChildChange(String?parentPath,?List?currentChilds)?throws?Exception?{
        ????????????????????if?(currentChilds?==?null)?{
        ????????????????????????currentChilds?=?Lists.newArrayList();
        ????????????????????}
        ????????????????????//存活的服務?IP?列表
        ????????????????????List?activityServiceIpList?=?Lists.newArrayList(Lists.transform(currentChilds,?new?Function()?{
        ????????????????????????@Override
        ????????????????????????public?String?apply(String?input)?{
        ????????????????????????????return?StringUtils.split(input,?"|")[0];
        ????????????????????????}
        ????????????????????}));
        ????????????????????refreshActivityService(activityServiceIpList);
        ????????????????}
        ????????????});

        ????????}
        ????}
        }

        至此服務實現(xiàn)類已被載入 Spring 容器中,且服務接口信息也注冊到了注冊中心。

        網(wǎng)絡通信

        作為生產(chǎn)者對外提供 RPC 服務,必須有一個網(wǎng)絡程序來來監(jiān)聽請求和做出響應。在 Java 領(lǐng)域 Netty 是一款高性能的 NIO 通信框架,很多的框架的通信都是采用 Netty 來實現(xiàn)的,本例中也采用它當做通信服務器。

        構(gòu)建并啟動 Netty 服務監(jiān)聽指定端口:

        public?void?start(final?int?port)?{
        ????????synchronized?(NettyServer.class)?{
        ????????????if?(bossGroup?!=?null?||?workerGroup?!=?null)?{
        ????????????????return;
        ????????????}

        ????????????bossGroup?=?new?NioEventLoopGroup();
        ????????????workerGroup?=?new?NioEventLoopGroup();
        ????????????ServerBootstrap?serverBootstrap?=?new?ServerBootstrap();
        ????????????serverBootstrap
        ????????????????????.group(bossGroup,?workerGroup)
        ????????????????????.channel(NioServerSocketChannel.class)
        ????????????????????.option(ChannelOption.SO_BACKLOG,?1024)
        ????????????????????.childOption(ChannelOption.SO_KEEPALIVE,?true)
        ????????????????????.childOption(ChannelOption.TCP_NODELAY,?true)
        ????????????????????.handler(new?LoggingHandler(LogLevel.INFO))
        ????????????????????.childHandler(new?ChannelInitializer()?{
        ????????????????????????@Override
        ????????????????????????protected?void?initChannel(SocketChannel?ch)?throws?Exception?{
        ????????????????????????????//注冊解碼器?NettyDecoderHandler
        ????????????????????????????ch.pipeline().addLast(new?NettyDecoderHandler(StormRequest.class,?serializeType));
        ????????????????????????????//注冊編碼器?NettyEncoderHandler
        ????????????????????????????ch.pipeline().addLast(new?NettyEncoderHandler(serializeType));
        ????????????????????????????//注冊服務端業(yè)務邏輯處理器?NettyServerInvokeHandler
        ????????????????????????????ch.pipeline().addLast(new?NettyServerInvokeHandler());
        ????????????????????????}
        ????????????????????});
        ????????????try?{
        ????????????????channel?=?serverBootstrap.bind(port).sync().channel();
        ????????????}?catch?(InterruptedException?e)?{
        ????????????????throw?new?RuntimeException(e);
        ????????????}
        ????????}
        ????}

        上面的代碼中向 Netty 服務的 Pipeline 中添加了編解碼和業(yè)務處理器,當接收到請求時,經(jīng)過編解碼后,真正處理業(yè)務的是業(yè)務處理器,即 NettyServerInvokeHandler,該處理器繼承自 SimpleChannelInboundHandler,當數(shù)據(jù)讀取完成將觸發(fā)一個事件,并調(diào)用 NettyServerInvokeHandler#channelRead0 方法來處理請求。

        @Override
        protected?void?channelRead0(ChannelHandlerContext?ctx,?StormRequest?request)?throws?Exception?{
        ????if?(ctx.channel().isWritable())?{
        ????????//從服務調(diào)用對象里獲取服務提供者信息
        ????????ProviderService?metaDataModel?=?request.getProviderService();
        ????????long?consumeTimeOut?=?request.getInvokeTimeout();
        ????????final?String?methodName?=?request.getInvokedMethodName();

        ????????//根據(jù)方法名稱定位到具體某一個服務提供者
        ????????String?serviceKey?=?metaDataModel.getServiceItf().getName();
        ????????//獲取限流工具類
        ????????int?workerThread?=?metaDataModel.getWorkerThreads();
        ????????Semaphore?semaphore?=?serviceKeySemaphoreMap.get(serviceKey);
        ????????if?(semaphore?==?null)?{
        ????????????synchronized?(serviceKeySemaphoreMap)?{
        ????????????????semaphore?=?serviceKeySemaphoreMap.get(serviceKey);
        ????????????????if?(semaphore?==?null)?{
        ????????????????????semaphore?=?new?Semaphore(workerThread);
        ????????????????????serviceKeySemaphoreMap.put(serviceKey,?semaphore);
        ????????????????}
        ????????????}
        ????????}

        ????????//獲取注冊中心服務
        ????????IRegisterCenter4Provider?registerCenter4Provider?=?RegisterCenter.singleton();
        ????????List?localProviderCaches?=?registerCenter4Provider.getProviderServiceMap().get(serviceKey);

        ????????Object?result?=?null;
        ????????boolean?acquire?=?false;

        ????????try?{
        ????????????ProviderService?localProviderCache?=?Collections2.filter(localProviderCaches,?new?Predicate()?{
        ????????????????@Override
        ????????????????public?boolean?apply(ProviderService?input)?{
        ????????????????????return?StringUtils.equals(input.getServiceMethod().getName(),?methodName);
        ????????????????}
        ????????????}).iterator().next();
        ????????????Object?serviceObject?=?localProviderCache.getServiceObject();

        ????????????//利用反射發(fā)起服務調(diào)用
        ????????????Method?method?=?localProviderCache.getServiceMethod();
        ????????????//利用?semaphore?實現(xiàn)限流
        ????????????acquire?=?semaphore.tryAcquire(consumeTimeOut,?TimeUnit.MILLISECONDS);
        ????????????if?(acquire)?{
        ????????????????result?=?method.invoke(serviceObject,?request.getArgs());
        ????????????????//System.out.println("---------------"+result);
        ????????????}
        ????????}?catch?(Exception?e)?{
        ????????????System.out.println(JSON.toJSONString(localProviderCaches)?+?"??"?+?methodName+"?"+e.getMessage());
        ????????????result?=?e;
        ????????}?finally?{
        ????????????if?(acquire)?{
        ????????????????semaphore.release();
        ????????????}
        ????????}
        ????????//根據(jù)服務調(diào)用結(jié)果組裝調(diào)用返回對象
        ????????StormResponse?response?=?new?StormResponse();
        ????????response.setInvokeTimeout(consumeTimeOut);
        ????????response.setUniqueKey(request.getUniqueKey());
        ????????response.setResult(result);
        ????????//將服務調(diào)用返回對象回寫到消費端
        ????????ctx.writeAndFlush(response);
        ????}?else?{
        ????????logger.error("------------channel?closed!---------------");
        ????}
        }

        此處還有部分細節(jié)如自定義的編解碼器等,篇幅所限不在此詳述,繼承 MessageToByteEncoder 和 ByteToMessageDecoder 覆寫對應的 encode 和 decode 方法即可自定義編解碼器,使用到的序列化工具如 Hessian/Proto 等可參考對應的官方文檔。

        請求和響應包裝

        為便于封裝請求和響應,定義兩個 bean 來表示請求和響應。

        請求:

        /**
        ?*?@author?孫浩
        ?*?@Descrption
        ?***/
        public?class?StormRequest?implements?Serializable?{

        ????private?static?final?long?serialVersionUID?=?-5196465012408804755L;
        ????//UUID,唯一標識一次返回值
        ????private?String?uniqueKey;
        ????//服務提供者信息
        ????private?ProviderService?providerService;
        ????//調(diào)用的方法名稱
        ????private?String?invokedMethodName;
        ????//傳遞參數(shù)
        ????private?Object[]?args;
        ????//消費端應用名
        ????private?String?appName;
        ????//消費請求超時時長
        ????private?long?invokeTimeout;
        ????//?getter/setter
        }

        響應:

        /**
        ?*?@author?孫浩
        ?*?@Descrption
        ?***/
        public?class?StormResponse?implements?Serializable?{
        ????private?static?final?long?serialVersionUID?=?5785265307118147202L;
        ????//UUID,?唯一標識一次返回值
        ????private?String?uniqueKey;
        ????//客戶端指定的服務超時時間
        ????private?long?invokeTimeout;
        ????//接口調(diào)用返回的結(jié)果對象
        ????private?Object?result;
        ????//getter/setter
        }


        客戶端(消費者)

        客戶端(消費者)在 RPC 調(diào)用中主要是生成服務接口的代理對象,并從注冊中心獲取對應的服務列表發(fā)起網(wǎng)絡請求。

        客戶端和服務端一樣采用 Spring 來管理 bean 解析 XML 配置等不再贅述,重點看下以下幾點:

        1、通過 JDK 動態(tài)代理來生成引入服務接口的代理對象

        public?Object?getProxy()?{
        ????return?Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),?new?Class[]{targetInterface},?this);
        }

        2、從注冊中心獲取服務列表并依據(jù)某種策略選取其中一個服務節(jié)點

        //服務接口名稱
        String?serviceKey?=?targetInterface.getName();
        //獲取某個接口的服務提供者列表
        IRegisterCenter4Invoker?registerCenter4Consumer?=?RegisterCenter.singleton();
        List?providerServices?=?registerCenter4Consumer.getServiceMetaDataMap4Consume().get(serviceKey);
        //根據(jù)軟負載策略,從服務提供者列表選取本次調(diào)用的服務提供者
        ClusterStrategy?clusterStrategyService?=?ClusterEngine.queryClusterStrategy(clusterStrategy);
        ProviderService?providerService?=?clusterStrategyService.select(providerServices);

        3、通過 Netty 建立連接,發(fā)起網(wǎng)絡請求

        /**
        ?*?@author?孫浩
        ?*?@Descrption?Netty?消費端?bean?代理工廠
        ?***/
        public?class?RevokerProxyBeanFactory?implements?InvocationHandler?{
        ????private?ExecutorService?fixedThreadPool?=?null;
        ????//服務接口
        ????private?Class?targetInterface;
        ????//超時時間
        ????private?int?consumeTimeout;
        ????//調(diào)用者線程數(shù)
        ????private?static?int?threadWorkerNumber?=?10;
        ????//負載均衡策略
        ????private?String?clusterStrategy;

        ????@Override
        ????public?Object?invoke(Object?proxy,?Method?method,?Object[]?args)?throws?Throwable?{

        ????????...

        ????????//復制一份服務提供者信息
        ????????ProviderService?newProvider?=?providerService.copy();
        ????????//設置本次調(diào)用服務的方法以及接口
        ????????newProvider.setServiceMethod(method);
        ????????newProvider.setServiceItf(targetInterface);

        ????????//聲明調(diào)用?AresRequest?對象,AresRequest?表示發(fā)起一次調(diào)用所包含的信息
        ????????final?StormRequest?request?=?new?StormRequest();
        ????????//設置本次調(diào)用的唯一標識
        ????????request.setUniqueKey(UUID.randomUUID().toString()?+?"-"?+?Thread.currentThread().getId());
        ????????//設置本次調(diào)用的服務提供者信息
        ????????request.setProviderService(newProvider);
        ????????//設置本次調(diào)用的方法名稱
        ????????request.setInvokedMethodName(method.getName());
        ????????//設置本次調(diào)用的方法參數(shù)信息
        ????????request.setArgs(args);

        ????????try?{
        ????????????//構(gòu)建用來發(fā)起調(diào)用的線程池
        ????????????if?(fixedThreadPool?==?null)?{
        ????????????????synchronized?(RevokerProxyBeanFactory.class)?{
        ????????????????????if?(null?==?fixedThreadPool)?{
        ????????????????????????fixedThreadPool?=?Executors.newFixedThreadPool(threadWorkerNumber);
        ????????????????????}
        ????????????????}
        ????????????}
        ????????????//根據(jù)服務提供者的?ip,port,?構(gòu)建?InetSocketAddress?對象,標識服務提供者地址
        ????????????String?serverIp?=?request.getProviderService().getServerIp();
        ????????????int?serverPort?=?request.getProviderService().getServerPort();
        ????????????InetSocketAddress?inetSocketAddress?=?new?InetSocketAddress(serverIp,?serverPort);
        ????????????//提交本次調(diào)用信息到線程池?fixedThreadPool,?發(fā)起調(diào)用
        ????????????Future?responseFuture?=?fixedThreadPool.submit(RevokerServiceCallable.of(inetSocketAddress,?request));
        ????????????//獲取調(diào)用的返回結(jié)果
        ????????????StormResponse?response?=?responseFuture.get(request.getInvokeTimeout(),?TimeUnit.MILLISECONDS);
        ????????????if?(response?!=?null)?{
        ????????????????return?response.getResult();
        ????????????}
        ????????}?catch?(Exception?e)?{
        ????????????throw?new?RuntimeException(e);
        ????????}
        ????????return?null;
        ????}
        ????//??...
        }

        Netty 的響應是異步的,為了在方法調(diào)用返回前獲取到響應結(jié)果,需要將異步的結(jié)果同步化。

        4、Netty 異步返回的結(jié)果存入阻塞隊列

        @Override
        protected?void?channelRead0(ChannelHandlerContext?channelHandlerContext,?StormResponse?response)?throws?Exception?{
        ????//將?Netty?異步返回的結(jié)果存入阻塞隊列,以便調(diào)用端同步獲取
        ????RevokerResponseHolder.putResultValue(response);
        }

        5、請求發(fā)出后同步獲取結(jié)果

        //提交本次調(diào)用信息到線程池?fixedThreadPool,?發(fā)起調(diào)用
        Future?responseFuture?=?fixedThreadPool.submit(RevokerServiceCallable.of(inetSocketAddress,?request));
        //獲取調(diào)用的返回結(jié)果
        StormResponse?response?=?responseFuture.get(request.getInvokeTimeout(),?TimeUnit.MILLISECONDS);
        if?(response?!=?null)?{
        ????return?response.getResult();
        }

        //===================================================
        //從返回結(jié)果容器中獲取返回結(jié)果,同時設置等待超時時間為?invokeTimeout
        long?invokeTimeout?=?request.getInvokeTimeout();
        StormResponse?response?=?RevokerResponseHolder.getValue(request.getUniqueKey(),?invokeTimeout);


        ?
        3?
        測試


        Server:

        /**
        ?*?@author?孫浩
        ?*?@Descrption
        ?***/
        public?class?MainServer?{
        ????public?static?void?main(String[]?args)?throws?Exception?{
        ????????//發(fā)布服務
        ????????final?ClassPathXmlApplicationContext?context?=?new?ClassPathXmlApplicationContext("storm-server.xml");
        ????????System.out.println("?服務發(fā)布完成");
        ????}
        }

        Client:

        public?class?Client?{

        ????private?static?final?Logger?logger?=?LoggerFactory.getLogger(Client.class);

        ????public?static?void?main(String[]?args)?throws?Exception?{

        ????????final?ClassPathXmlApplicationContext?context?=?new?ClassPathXmlApplicationContext("storm-client.xml");
        ????????final?HelloService?helloService?=?(HelloService)?context.getBean("helloService");
        ????????String?result?=?helloService.sayHello("World");
        ????????System.out.println(result);
        ????????for?(;;)?{

        ????????}
        ????}
        }


        結(jié)果

        生產(chǎn)者:


        消費者:


        注冊中心:


        ?
        4?
        總結(jié)


        本文簡單介紹了 RPC 的整個流程,并實現(xiàn)了一個簡單的 RPC 調(diào)用。希望閱讀完本文之后,能加深你對 RPC 的一些認識。

        生產(chǎn)者端流程:

        • 加載服務接口,并緩存

        • 服務注冊,將服務接口以及服務主機信息寫入注冊中心(本例使用的是 ZooKeeper)

        • 啟動網(wǎng)絡服務器并監(jiān)聽

        • 反射,本地調(diào)用

        消費者端流程:

        • 代理服務接口生成代理對象

        • 服務發(fā)現(xiàn)(連接 ZooKeeper,拿到服務地址列表,通過客戶端負載策略獲取合適的服務地址)

        • 遠程方法調(diào)用(本例通過 Netty,發(fā)送消息,并獲取響應結(jié)果)

        限于篇幅,本文代碼并不完整,如有需要,可以參考以下鏈接,我貼出來了完整代碼給你。

        https://github.com/fankongqiumu/storm.git


        推薦閱讀:

        世界的真實格局分析,地球人類社會底層運行原理

        不是你需要中臺,而是一名合格的架構(gòu)師(附各大廠中臺建設PPT)

        企業(yè)IT技術(shù)架構(gòu)規(guī)劃方案

        論數(shù)字化轉(zhuǎn)型——轉(zhuǎn)什么,如何轉(zhuǎn)?

        華為干部與人才發(fā)展手冊(附PPT)

        企業(yè)10大管理流程圖,數(shù)字化轉(zhuǎn)型從業(yè)者必備!

        【中臺實踐】華為大數(shù)據(jù)中臺架構(gòu)分享.pdf

        華為的數(shù)字化轉(zhuǎn)型方法論

        華為如何實施數(shù)字化轉(zhuǎn)型(附PPT)

        超詳細280頁Docker實戰(zhàn)文檔!開放下載

        華為大數(shù)據(jù)解決方案(PPT)


        瀏覽 63
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            少妇av中文字幕 小雪被房东的好爽文 | 女人上床靠逼 | 国产又粗又长又爽 | 日本黄色免费视频网站 | 午夜成人一区 | 日韩AV一二三区 | 麻豆色播 | 18禁亚洲| 日本少妇被黑人粗大的猛激 | 操逼打炮网 |