1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        手寫(xiě)RPC框架,理解更透徹,代碼已上傳Github!

        共 17148字,需瀏覽 35分鐘

         ·

        2021-02-19 00:26

        程序員的成長(zhǎng)之路
        互聯(lián)網(wǎng)/程序員/技術(shù)/資料共享?
        關(guān)注


        閱讀本文大概需要 9?分鐘。

        來(lái)自:https://www.cnblogs.com/2YSP/p/13545217.html

        一、前言

        前段時(shí)間看到一篇不錯(cuò)的文章《看了這篇你就會(huì)手寫(xiě)RPC框架了》,于是便來(lái)了興趣對(duì)著實(shí)現(xiàn)了一遍,后面覺(jué)得還有很多優(yōu)化的地方便對(duì)其進(jìn)行了改進(jìn)。
        主要改動(dòng)點(diǎn)如下:
        1. 除了Java序列化協(xié)議,增加了protobuf和kryo序列化協(xié)議,配置即用。
        2. 增加多種負(fù)載均衡算法(隨機(jī)、輪詢、加權(quán)輪詢、平滑加權(quán)輪詢),配置即用。
        3. 客戶端增加本地服務(wù)列表緩存,提高性能。
        4. 修復(fù)高并發(fā)情況下,netty導(dǎo)致的內(nèi)存泄漏問(wèn)題
        5. 由原來(lái)的每個(gè)請(qǐng)求建立一次連接,改為建立TCP長(zhǎng)連接,并多次復(fù)用。
        6. 服務(wù)端增加線程池提高消息處理能力

        二、介紹

        RPC,即 Remote Procedure Call(遠(yuǎn)程過(guò)程調(diào)用),調(diào)用遠(yuǎn)程計(jì)算機(jī)上的服務(wù),就像調(diào)用本地服務(wù)一樣。RPC可以很好的解耦系統(tǒng),如WebService就是一種基于Http協(xié)議的RPC。
        調(diào)用示意圖
        調(diào)用示意圖
        總的來(lái)說(shuō),就如下幾個(gè)步驟:
        1. 客戶端(ServerA)執(zhí)行遠(yuǎn)程方法時(shí)就調(diào)用client stub傳遞類(lèi)名、方法名和參數(shù)等信息。
        2. client stub會(huì)將參數(shù)等信息序列化為二進(jìn)制流的形式,然后通過(guò)Sockect發(fā)送給服務(wù)端(ServerB)
        3. 服務(wù)端收到數(shù)據(jù)包后,server stub 需要進(jìn)行解析反序列化為類(lèi)名、方法名和參數(shù)等信息。
        4. server stub調(diào)用對(duì)應(yīng)的本地方法,并把執(zhí)行結(jié)果返回給客戶端
        所以一個(gè)RPC框架有如下角色:

        服務(wù)消費(fèi)者

        遠(yuǎn)程方法的調(diào)用方,即客戶端。一個(gè)服務(wù)既可以是消費(fèi)者也可以是提供者。

        服務(wù)提供者

        遠(yuǎn)程服務(wù)的提供方,即服務(wù)端。一個(gè)服務(wù)既可以是消費(fèi)者也可以是提供者。

        注冊(cè)中心

        保存服務(wù)提供者的服務(wù)地址等信息,一般由zookeeper、redis等實(shí)現(xiàn)。

        監(jiān)控運(yùn)維(可選)

        監(jiān)控接口的響應(yīng)時(shí)間、統(tǒng)計(jì)請(qǐng)求數(shù)量等,及時(shí)發(fā)現(xiàn)系統(tǒng)問(wèn)題并發(fā)出告警通知。

        三、實(shí)現(xiàn)

        本RPC框架rpc-spring-boot-starter涉及技術(shù)棧如下:
        • 使用zookeeper作為注冊(cè)中心
        • 使用netty作為通信框架
        • 消息編解碼:protostuff、kryo、java
        • spring
        • 使用SPI來(lái)根據(jù)配置動(dòng)態(tài)選擇負(fù)載均衡算法等
        由于代碼過(guò)多,這里只講幾處改動(dòng)點(diǎn)。

        3.1動(dòng)態(tài)負(fù)載均衡算法

        1.編寫(xiě)LoadBalance的實(shí)現(xiàn)類(lèi)
        負(fù)載均衡算法實(shí)現(xiàn)類(lèi)
        2.自定義注解?@LoadBalanceAno

        /**
        ?*?負(fù)載均衡注解
        ?*/

        @Target(ElementType.TYPE)
        @Retention(RetentionPolicy.RUNTIME)
        @Documented
        public?@interface?LoadBalanceAno?{

        ????String?value()?default?"";
        }

        /**
        ?*?輪詢算法
        ?*/

        @LoadBalanceAno(RpcConstant.BALANCE_ROUND)
        public?class?FullRoundBalance?implements?LoadBalance?{

        ????private?static?Logger?logger?=?LoggerFactory.getLogger(FullRoundBalance.class);

        ????private?volatile?int?index;

        ????@Override
        ????public?synchronized?Service?chooseOne(List?services)?{
        ????????//?加鎖防止多線程情況下,index超出services.size()
        ????????if?(index?==?services.size())?{
        ????????????index?=?0;
        ????????}
        ????????return?services.get(index++);
        ????}
        }

        3.新建在resource目錄下META-INF/servers文件夾并創(chuàng)建文件
        enter description here
        4.RpcConfig增加配置項(xiàng)loadBalance

        /**
        ?*?@author?2YSP
        ?*?@date?2020/7/26?15:13
        ?*/

        @ConfigurationProperties(prefix?=?"sp.rpc")
        public?class?RpcConfig?{

        ????/**
        ?????*?服務(wù)注冊(cè)中心地址
        ?????*/

        ????private?String?registerAddress?=?"127.0.0.1:2181";

        ????/**
        ?????*?服務(wù)暴露端口
        ?????*/

        ????private?Integer?serverPort?=?9999;
        ????/**
        ?????*?服務(wù)協(xié)議
        ?????*/

        ????private?String?protocol?=?"java";
        ????/**
        ?????*?負(fù)載均衡算法
        ?????*/

        ????private?String?loadBalance?=?"random";
        ????/**
        ?????*?權(quán)重,默認(rèn)為1
        ?????*/

        ????private?Integer?weight?=?1;

        ???//?省略getter?setter
        }

        5.在自動(dòng)配置類(lèi)RpcAutoConfiguration根據(jù)配置選擇對(duì)應(yīng)的算法實(shí)現(xiàn)類(lèi)

        /**
        ?????*?使用spi匹配符合配置的負(fù)載均衡算法
        ?????*
        ?????*?@param?name
        ?????*?@return
        ?????*/

        ????private?LoadBalance?getLoadBalance(String?name)?{
        ????????ServiceLoader?loader?=?ServiceLoader.load(LoadBalance.class);
        ????????Iterator?iterator?=?loader.iterator();
        ????????while?(iterator.hasNext())?{
        ????????????LoadBalance?loadBalance?=?iterator.next();
        ????????????LoadBalanceAno?ano?=?loadBalance.getClass().getAnnotation(LoadBalanceAno.class);
        ????????????Assert.notNull(ano,?"load?balance?name?can?not?be?empty!");
        ????????????if?(name.equals(ano.value()))?{
        ????????????????return?loadBalance;
        ????????????}
        ????????}
        ????????throw?new?RpcException("invalid?load?balance?config");
        ????}

        ?@Bean
        ????public?ClientProxyFactory?proxyFactory(@Autowired?RpcConfig?rpcConfig)?{
        ????????ClientProxyFactory?clientProxyFactory?=?new?ClientProxyFactory();
        ????????//?設(shè)置服務(wù)發(fā)現(xiàn)著
        ????????clientProxyFactory.setServerDiscovery(new???????????ZookeeperServerDiscovery(rpcConfig.getRegisterAddress()));

        ????????//?設(shè)置支持的協(xié)議
        ????????Map?supportMessageProtocols?=?buildSupportMessageProtocols();
        ????????clientProxyFactory.setSupportMessageProtocols(supportMessageProtocols);
        ????????//?設(shè)置負(fù)載均衡算法
        ????????LoadBalance?loadBalance?=?getLoadBalance(rpcConfig.getLoadBalance());
        ????????clientProxyFactory.setLoadBalance(loadBalance);
        ????????//?設(shè)置網(wǎng)絡(luò)層實(shí)現(xiàn)
        ????????clientProxyFactory.setNetClient(new?NettyNetClient());

        ????????return?clientProxyFactory;
        ????}

        3.2本地服務(wù)列表緩存

        使用Map來(lái)緩存數(shù)據(jù)

        /**
        ?*?服務(wù)發(fā)現(xiàn)本地緩存
        ?*/

        public?class?ServerDiscoveryCache?{
        ????/**
        ?????*?key:?serviceName
        ?????*/

        ????private?static?final?Map>?SERVER_MAP?=?new?ConcurrentHashMap<>();
        ????/**
        ?????*?客戶端注入的遠(yuǎn)程服務(wù)service?class
        ?????*/

        ????public?static?final?List?SERVICE_CLASS_NAMES?=?new?ArrayList<>();

        ????public?static?void?put(String?serviceName,?List?serviceList)?{
        ????????SERVER_MAP.put(serviceName,?serviceList);
        ????}

        ????/**
        ?????*?去除指定的值
        ?????*?@param?serviceName
        ?????*?@param?service
        ?????*/

        ????public?static?void?remove(String?serviceName,?Service?service)?{
        ????????SERVER_MAP.computeIfPresent(serviceName,?(key,?value)?->
        ????????????????value.stream().filter(o?->?!o.toString().equals(service.toString())).collect(Collectors.toList())
        ????????);
        ????}

        ????public?static?void?removeAll(String?serviceName)?{
        ????????SERVER_MAP.remove(serviceName);
        ????}


        ????public?static?boolean?isEmpty(String?serviceName)?{
        ????????return?SERVER_MAP.get(serviceName)?==?null?||?SERVER_MAP.get(serviceName).size()?==?0;
        ????}

        ????public?static?List?get(String?serviceName)?{
        ????????return?SERVER_MAP.get(serviceName);
        ????}
        }

        ClientProxyFactory,先查本地緩存,緩存沒(méi)有再查詢zookeeper。

        /**
        ?????*?根據(jù)服務(wù)名獲取可用的服務(wù)地址列表
        ?????*?@param?serviceName
        ?????*?@return
        ?????*/

        ????private?List?getServiceList(String?serviceName)?{
        ????????List?services;
        ????????synchronized?(serviceName){
        ????????????if?(ServerDiscoveryCache.isEmpty(serviceName))?{
        ????????????????services?=?serverDiscovery.findServiceList(serviceName);
        ????????????????if?(services?==?null?||?services.size()?==?0)?{
        ????????????????????throw?new?RpcException("No?provider?available!");
        ????????????????}
        ????????????????ServerDiscoveryCache.put(serviceName,?services);
        ????????????}?else?{
        ????????????????services?=?ServerDiscoveryCache.get(serviceName);
        ????????????}
        ????????}
        ????????return?services;
        ????}

        問(wèn)題:?如果服務(wù)端因?yàn)殄礄C(jī)或網(wǎng)絡(luò)問(wèn)題下線了,緩存卻還在就會(huì)導(dǎo)致客戶端請(qǐng)求已經(jīng)不可用的服務(wù)端,增加請(qǐng)求失敗率。**解決方案:**由于服務(wù)端注冊(cè)的是臨時(shí)節(jié)點(diǎn),所以如果服務(wù)端下線節(jié)點(diǎn)會(huì)被移除。只要監(jiān)聽(tīng)zookeeper的子節(jié)點(diǎn),如果新增或刪除子節(jié)點(diǎn)就直接清空本地緩存即可。
        DefaultRpcProcessor

        /**
        ?*?Rpc處理者,支持服務(wù)啟動(dòng)暴露,自動(dòng)注入Service
        ?*?@author?2YSP
        ?*?@date?2020/7/26?14:46
        ?*/

        public?class?DefaultRpcProcessor?implements?ApplicationListener<ContextRefreshedEvent>?{

        ???

        ????@Override
        ????public?void?onApplicationEvent(ContextRefreshedEvent?event)?{
        ????????//?Spring啟動(dòng)完畢過(guò)后會(huì)收到一個(gè)事件通知
        ????????if?(Objects.isNull(event.getApplicationContext().getParent())){
        ????????????ApplicationContext?context?=?event.getApplicationContext();
        ????????????//?開(kāi)啟服務(wù)
        ????????????startServer(context);
        ????????????//?注入Service
        ????????????injectService(context);
        ????????}
        ????}

        ????private?void?injectService(ApplicationContext?context)?{
        ????????String[]?names?=?context.getBeanDefinitionNames();
        ????????for(String?name?:?names){
        ????????????Class?clazz?=?context.getType(name);
        ????????????if?(Objects.isNull(clazz)){
        ????????????????continue;
        ????????????}

        ????????????Field[]?declaredFields?=?clazz.getDeclaredFields();
        ????????????for(Field?field?:?declaredFields){
        ????????????????//?找出標(biāo)記了InjectService注解的屬性
        ????????????????InjectService?injectService?=?field.getAnnotation(InjectService.class);
        ????????????????if?(injectService?==?null){
        ????????????????????continue;
        ????????????????}

        ????????????????Class?fieldClass?=?field.getType();
        ????????????????Object?object?=?context.getBean(name);
        ????????????????field.setAccessible(true);
        ????????????????try?{
        ????????????????????field.set(object,clientProxyFactory.getProxy(fieldClass));
        ????????????????}?catch?(IllegalAccessException?e)?{
        ????????????????????e.printStackTrace();
        ????????????????}
        ????//?添加本地服務(wù)緩存
        ????????????????ServerDiscoveryCache.SERVICE_CLASS_NAMES.add(fieldClass.getName());
        ????????????}
        ????????}
        ????????//?注冊(cè)子節(jié)點(diǎn)監(jiān)聽(tīng)
        ????????if?(clientProxyFactory.getServerDiscovery()?instanceof?ZookeeperServerDiscovery){
        ????????????ZookeeperServerDiscovery?serverDiscovery?=?(ZookeeperServerDiscovery)?clientProxyFactory.getServerDiscovery();
        ????????????ZkClient?zkClient?=?serverDiscovery.getZkClient();
        ????????????ServerDiscoveryCache.SERVICE_CLASS_NAMES.forEach(name?->{
        ????????????????String?servicePath?=?RpcConstant.ZK_SERVICE_PATH?+?RpcConstant.PATH_DELIMITER?+?name?+?"/service";
        ????????????????zkClient.subscribeChildChanges(servicePath,?new?ZkChildListenerImpl());
        ????????????});
        ????????????logger.info("subscribe?service?zk?node?successfully");
        ????????}

        ????}

        ????private?void?startServer(ApplicationContext?context)?{
        ????????...

        ????}
        }

        ZkChildListenerImpl

        /**
        ?*?子節(jié)點(diǎn)事件監(jiān)聽(tīng)處理類(lèi)
        ?*/

        public?class?ZkChildListenerImpl?implements?IZkChildListener?{

        ????private?static?Logger?logger?=?LoggerFactory.getLogger(ZkChildListenerImpl.class);

        ????/**
        ?????*?監(jiān)聽(tīng)子節(jié)點(diǎn)的刪除和新增事件
        ?????*?@param?parentPath?/rpc/serviceName/service
        ?????*?@param?childList
        ?????*?@throws?Exception
        ?????*/

        ????@Override
        ????public?void?handleChildChange(String?parentPath,?List?childList)?throws?Exception?{
        ????????logger.debug("Child?change?parentPath:[{}]?--?childList:[{}]",?parentPath,?childList);
        ????????//?只要子節(jié)點(diǎn)有改動(dòng)就清空緩存
        ????????String[]?arr?=?parentPath.split("/");
        ????????ServerDiscoveryCache.removeAll(arr[2]);
        ????}
        }

        3.3nettyClient支持TCP長(zhǎng)連接

        這部分的改動(dòng)最多,先增加新的sendRequest接口。
        添加接口
        實(shí)現(xiàn)類(lèi)NettyNetClient

        /**
        ?*?@author?2YSP
        ?*?@date?2020/7/25?20:12
        ?*/

        public?class?NettyNetClient?implements?NetClient?{

        ????private?static?Logger?logger?=?LoggerFactory.getLogger(NettyNetClient.class);

        ????private?static?ExecutorService?threadPool?=?new?ThreadPoolExecutor(4,?10,?200,
        ????????????TimeUnit.SECONDS,?new?LinkedBlockingQueue<>(1000),?new?ThreadFactoryBuilder()
        ????????????.setNameFormat("rpcClient-%d")
        ????????????.build());

        ????private?EventLoopGroup?loopGroup?=?new?NioEventLoopGroup(4);

        ????/**
        ?????*?已連接的服務(wù)緩存
        ?????* key:?服務(wù)地址,格式:ip:port
        ?????*/

        ????public?static?Map?connectedServerNodes?=?new?ConcurrentHashMap<>();

        ????@Override
        ????public?byte[]?sendRequest(byte[]?data,?Service?service)?throws?InterruptedException?{
        ??....
        ????????return?respData;
        ????}

        ????@Override
        ????public?RpcResponse?sendRequest(RpcRequest?rpcRequest,?Service?service,?MessageProtocol?messageProtocol)?{

        ????????String?address?=?service.getAddress();
        ????????synchronized?(address)?{
        ????????????if?(connectedServerNodes.containsKey(address))?{
        ????????????????SendHandlerV2?handler?=?connectedServerNodes.get(address);
        ????????????????logger.info("使用現(xiàn)有的連接");
        ????????????????return?handler.sendRequest(rpcRequest);
        ????????????}

        ????????????String[]?addrInfo?=?address.split(":");
        ????????????final?String?serverAddress?=?addrInfo[0];
        ????????????final?String?serverPort?=?addrInfo[1];
        ????????????final?SendHandlerV2?handler?=?new?SendHandlerV2(messageProtocol,?address);
        ????????????threadPool.submit(()?->?{
        ????????????????????????//?配置客戶端
        ????????????????????????Bootstrap?b?=?new?Bootstrap();
        ????????????????????????b.group(loopGroup).channel(NioSocketChannel.class)
        ????????????????????????????????.option(ChannelOption.TCP_NODELAY,?true)
        ????????????????????????????????.handler(new?ChannelInitializer<SocketChannel>()?
        {
        ????????????????????????????????????@Override
        ????????????????????????????????????protected?void?initChannel(SocketChannel?socketChannel)?throws?Exception?{
        ????????????????????????????????????????ChannelPipeline?pipeline?=?socketChannel.pipeline();
        ????????????????????????????????????????pipeline
        ????????????????????????????????????????????????.addLast(handler);
        ????????????????????????????????????}
        ????????????????????????????????});
        ????????????????????????//?啟用客戶端連接
        ????????????????????????ChannelFuture?channelFuture?=?b.connect(serverAddress,?Integer.parseInt(serverPort));
        ????????????????????????channelFuture.addListener(new?ChannelFutureListener()?{
        ????????????????????????????@Override
        ????????????????????????????public?void?operationComplete(ChannelFuture?channelFuture)?throws?Exception?{
        ????????????????????????????????connectedServerNodes.put(address,?handler);
        ????????????????????????????}
        ????????????????????????});
        ????????????????????}
        ????????????);
        ????????????logger.info("使用新的連接。。。");
        ????????????return?handler.sendRequest(rpcRequest);
        ????????}
        ????}
        }

        每次請(qǐng)求都會(huì)調(diào)用sendRequest()方法,用線程池異步和服務(wù)端創(chuàng)建TCP長(zhǎng)連接,連接成功后將SendHandlerV2緩存到ConcurrentHashMap中方便復(fù)用,后續(xù)請(qǐng)求的請(qǐng)求地址(ip+port)如果在connectedServerNodes中存在則使用connectedServerNodes中的handler處理不再重新建立連接。
        SendHandlerV2

        /**
        ?*?@author?2YSP
        ?*?@date?2020/8/19?20:06
        ?*/

        public?class?SendHandlerV2?extends?ChannelInboundHandlerAdapter?{

        ????private?static?Logger?logger?=?LoggerFactory.getLogger(SendHandlerV2.class);

        ????/**
        ?????*?等待通道建立最大時(shí)間
        ?????*/

        ????static?final?int?CHANNEL_WAIT_TIME?=?4;
        ????/**
        ?????*?等待響應(yīng)最大時(shí)間
        ?????*/

        ????static?final?int?RESPONSE_WAIT_TIME?=?8;

        ????private?volatile?Channel?channel;

        ????private?String?remoteAddress;

        ????private?static?Map>?requestMap?=?new?ConcurrentHashMap<>();

        ????private?MessageProtocol?messageProtocol;

        ????private?CountDownLatch?latch?=?new?CountDownLatch(1);

        ????public?SendHandlerV2(MessageProtocol?messageProtocol,String?remoteAddress)?{
        ????????this.messageProtocol?=?messageProtocol;
        ????????this.remoteAddress?=?remoteAddress;
        ????}

        ????@Override
        ????public?void?channelRegistered(ChannelHandlerContext?ctx)?throws?Exception?{
        ????????this.channel?=?ctx.channel();
        ????????latch.countDown();
        ????}

        ????@Override
        ????public?void?channelActive(ChannelHandlerContext?ctx)?throws?Exception?{
        ????????logger.debug("Connect?to?server?successfully:{}",?ctx);
        ????}

        ????@Override
        ????public?void?channelRead(ChannelHandlerContext?ctx,?Object?msg)?throws?Exception?{
        ????????logger.debug("Client?reads?message:{}",?msg);
        ????????ByteBuf?byteBuf?=?(ByteBuf)?msg;
        ????????byte[]?resp?=?new?byte[byteBuf.readableBytes()];
        ????????byteBuf.readBytes(resp);
        ????????//?手動(dòng)回收
        ????????ReferenceCountUtil.release(byteBuf);
        ????????RpcResponse?response?=?messageProtocol.unmarshallingResponse(resp);
        ????????RpcFuture?future?=?requestMap.get(response.getRequestId());
        ????????future.setResponse(response);
        ????}

        ????@Override
        ????public?void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)?throws?Exception?{
        ????????cause.printStackTrace();
        ????????logger.error("Exception?occurred:{}",?cause.getMessage());
        ????????ctx.close();
        ????}

        ????@Override
        ????public?void?channelReadComplete(ChannelHandlerContext?ctx)?throws?Exception?{
        ????????ctx.flush();
        ????}

        ????@Override
        ????public?void?channelInactive(ChannelHandlerContext?ctx)?throws?Exception?{
        ????????super.channelInactive(ctx);
        ????????logger.error("channel?inactive?with?remoteAddress:[{}]",remoteAddress);
        ????????NettyNetClient.connectedServerNodes.remove(remoteAddress);

        ????}

        ????@Override
        ????public?void?userEventTriggered(ChannelHandlerContext?ctx,?Object?evt)?throws?Exception?{
        ????????super.userEventTriggered(ctx,?evt);
        ????}

        ????public?RpcResponse?sendRequest(RpcRequest?request)?{
        ????????RpcResponse?response;
        ????????RpcFuture?future?=?new?RpcFuture<>();
        ????????requestMap.put(request.getRequestId(),?future);
        ????????try?{
        ????????????byte[]?data?=?messageProtocol.marshallingRequest(request);
        ????????????ByteBuf?reqBuf?=?Unpooled.buffer(data.length);
        ????????????reqBuf.writeBytes(data);
        ????????????if?(latch.await(CHANNEL_WAIT_TIME,TimeUnit.SECONDS)){
        ????????????????channel.writeAndFlush(reqBuf);
        ????????????????//?等待響應(yīng)
        ????????????????response?=?future.get(RESPONSE_WAIT_TIME,?TimeUnit.SECONDS);
        ????????????}else?{
        ????????????????throw?new?RpcException("establish?channel?time?out");
        ????????????}
        ????????}?catch?(Exception?e)?{
        ????????????throw?new?RpcException(e.getMessage());
        ????????}?finally?{
        ????????????requestMap.remove(request.getRequestId());
        ????????}
        ????????return?response;
        ????}
        }

        RpcFuture
        package?cn.sp.rpc.client.net;

        import?java.util.concurrent.*;

        /**
        ?*?@author?2YSP
        ?*?@date?2020/8/19?22:31
        ?*/

        public?class?RpcFuture<T>?implements?Future<T>?{

        ????private?T?response;
        ????/**
        ?????*?因?yàn)檎?qǐng)求和響應(yīng)是一一對(duì)應(yīng)的,所以這里是1
        ?????*/

        ????private?CountDownLatch?countDownLatch?=?new?CountDownLatch(1);
        ????/**
        ?????*?Future的請(qǐng)求時(shí)間,用于計(jì)算Future是否超時(shí)
        ?????*/

        ????private?long?beginTime?=?System.currentTimeMillis();

        ????@Override
        ????public?boolean?cancel(boolean?mayInterruptIfRunning)?{
        ????????return?false;
        ????}

        ????@Override
        ????public?boolean?isCancelled()?{
        ????????return?false;
        ????}

        ????@Override
        ????public?boolean?isDone()?{
        ????????if?(response?!=?null)?{
        ????????????return?true;
        ????????}
        ????????return?false;
        ????}

        ????/**
        ?????*?獲取響應(yīng),直到有結(jié)果才返回
        ?????*?@return
        ?????*?@throws?InterruptedException
        ?????*?@throws?ExecutionException
        ?????*/

        ????@Override
        ????public?T?get()?throws?InterruptedException,?ExecutionException?{
        ????????countDownLatch.await();
        ????????return?response;
        ????}

        ????@Override
        ????public?T?get(long?timeout,?TimeUnit?unit)?throws?InterruptedException,?ExecutionException,?TimeoutException?{
        ????????if?(countDownLatch.await(timeout,unit)){
        ????????????return?response;
        ????????}
        ????????return?null;
        ????}

        ????public?void?setResponse(T?response)?{
        ????????this.response?=?response;
        ????????countDownLatch.countDown();
        ????}

        ????public?long?getBeginTime()?{
        ????????return?beginTime;
        ????}
        }

        此處邏輯,第一次執(zhí)行 SendHandlerV2#sendRequest()?時(shí)channel需要等待通道建立好之后才能發(fā)送請(qǐng)求,所以用CountDownLatch來(lái)控制,等待通道建立。自定義Future+requestMap緩存來(lái)實(shí)現(xiàn)netty的請(qǐng)求和阻塞等待響應(yīng),RpcRequest對(duì)象在創(chuàng)建時(shí)會(huì)生成一個(gè)請(qǐng)求的唯一標(biāo)識(shí)requestId,發(fā)送請(qǐng)求前先將RpcFuture緩存到requestMap中,key為requestId,讀取到服務(wù)端的響應(yīng)信息后(channelRead方法),將響應(yīng)結(jié)果放入對(duì)應(yīng)的RpcFuture中。SendHandlerV2#channelInactive()?方法中,如果連接的服務(wù)端異常斷開(kāi)連接了,則及時(shí)清理緩存中對(duì)應(yīng)的serverNode。

        四、壓力測(cè)試

        測(cè)試環(huán)境:
        • (英特爾)Intel(R) Core(TM) i5-6300HQ CPU @ 2.30GHz 4核
        • windows10家庭版(64位)
        • 16G內(nèi)存
        1.本地啟動(dòng)zookeeper 2.本地啟動(dòng)一個(gè)消費(fèi)者,兩個(gè)服務(wù)端,輪詢算法 3.使用ab進(jìn)行壓力測(cè)試,4個(gè)線程發(fā)送10000個(gè)請(qǐng)求

        ab?-c?4?-n?10000?http://localhost:8080/test/user?id=1

        測(cè)試結(jié)果
        測(cè)試結(jié)果
        從圖片可以看出,10000個(gè)請(qǐng)求只用了11s,比之前的130+秒耗時(shí)減少了10倍以上。
        代碼地址:
        https://github.com/2YSP/rpc-spring-boot-starter
        https://github.com/2YSP/rpc-example

        推薦閱讀:

        程序員牛逼的摸魚(yú)神器來(lái)了?上班也可以在看股票、基金實(shí)時(shí)數(shù)據(jù)~

        一行代碼搞定Spring Boot反爬蟲(chóng),防止接口盜刷!

        5T技術(shù)資源大放送!包括但不限于:C/C++,Linux,Python,Java,PHP,人工智能,單片機(jī),樹(shù)莓派,等等。在公眾號(hào)內(nèi)回復(fù)「2048」,即可免費(fèi)獲取??!

        微信掃描二維碼,關(guān)注我的公眾號(hào)

        朕已閱?

        瀏覽 100
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            青青草视频在线网站 | 熟女人妻在线 | 亚洲最大黄色 | 91色情小说| 揉捏穆桂英双乳三级潘金莲 | 国产精品99久久久久久98AV | 激情无码网| 青青操在线视频 | 美女被c在线观看视频 | 北条麻妃九九九精品高清在线 |