1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        10個類手寫實現(xiàn) RPC 通信框架原理

        共 9299字,需瀏覽 19分鐘

         ·

        2020-08-27 03:48

        鏈接:autumn200.com/2020/06/21/write-rpc/

        ??? ?

        ? ?正文? ?

        什么是rpc

        RPC:remote procedure call Protocol 遠程過程調(diào)用 調(diào)用遠程服務(wù),就像調(diào)用本地的服務(wù)一樣,不用關(guān)心調(diào)用細節(jié),就像調(diào)用本機的服務(wù)一樣的

        RPC原理

        實現(xiàn)RPC通信的程序包括5個部分:rpc-client、客戶端proxy、socket、服務(wù)端proxy、rpc-server

        request

        • 客戶端:當(dāng)rpc-client發(fā)起遠程調(diào)用時,實際上是通過客戶端代理 將要調(diào)用的接口、方法、參數(shù)、參數(shù)類型進行序列化,然后通過socket實時將封裝調(diào)用參數(shù)的實例發(fā)送到服務(wù)端。
        • 服務(wù)端:socket接收到客戶端傳來的信息進行反序列化,然后通過這些信息委派到具體的實現(xiàn)對象

        response

        • 服務(wù)端:目標(biāo)方法執(zhí)行完成后,將執(zhí)行結(jié)果返回給socket
        • 客戶端:socket接收到結(jié)果后,返回給rpc-client,調(diào)用結(jié)束

        應(yīng)用到的技術(shù)

        • java
        • spring
        • 序列化
        • socket
        • 反射
        • 動態(tài)代理

        項目GitHub地址

        https://github.com/autumnqfeng/write_rpc

        服務(wù)端項目

        項目結(jié)構(gòu)

        rpc-server項目包含2個子項目:order-api、order-provider

        • order-api中存放請求接口與RpcRequest(類名、方法名、參數(shù)的實體類)

        • order-provider為請求接口實現(xiàn)、socket、proxy相關(guān)類

        order-api

        order-provider

        服務(wù)注冊

        要想實現(xiàn)動態(tài)調(diào)用ServiceImpl,關(guān)鍵就需要將service類管理起來,那問題來了,我們?nèi)绾喂芾磉@些服務(wù)類呢?

        我們可以參照spring中的@Service注解,通過自定義注解的方式來做到服務(wù)注冊,我們定義一個注解@RpcRemoteService作用在ServiceImpl類上,將標(biāo)記此注解的類名、方法名保存到Map中,以此來定位到具體實現(xiàn)類。

        @RpcRemoteService注解

        /**
        ?*?服務(wù)端服務(wù)發(fā)現(xiàn)注解
        ?*
        ?*?@author:?***
        ?*?@date:?2020/6/21?16:21
        ?*/

        @Target(ElementType.TYPE)
        @Retention(RetentionPolicy.RUNTIME)
        @Component
        public?@interface?RpcRemoteService?{
        }

        服務(wù)注冊類InitialMerdiator

        在spring容器初始化完成之后,掃描到@RpcRemoteService標(biāo)記的類,并保存到Mediator.ROUTING中。

        /**
        ?*?初始化中間代理層對象
        ?*
        ?*?@author:?***
        ?*?@date:?2020/6/21?16:33
        ?*/

        @Component
        public?class?InitialMerdiator?implements?BeanPostProcessor?{

        ????@Override
        ????public?Object?postProcessAfterInitialization(Object?bean,?String?beanName)?throws?BeansException?{
        ????????//加了服務(wù)發(fā)布標(biāo)記的bean進行遠程發(fā)布
        ????????if?(bean.getClass().isAnnotationPresent(RpcRemoteService.class))?{
        ????????????Method[]?methods?=?bean.getClass().getDeclaredMethods();
        ????????????for?(Method?method?:?methods)?{
        ????????????????String?routingKey?=?bean.getClass().getInterfaces()[0].getName()?+?"."?+?method.getName();
        ????????????????BeanMethod?beanMethod?=?new?BeanMethod();
        ????????????????beanMethod.setBean(bean);
        ????????????????beanMethod.setMethod(method);
        ????????????????Mediator.ROUTING.put(routingKey,?beanMethod);
        ????????????}
        ????????}
        ????????return?bean;
        ????}
        }

        socket監(jiān)聽

        socket監(jiān)聽客戶端請求

        socket啟動類SocketServer

        spring容器加載完成之后,啟動socket

        /**
        ?*?spring?容器啟動完成之后,會發(fā)布一個ContextRefreshedEven
        ?*?容器啟動后啟動socket監(jiān)聽
        ?*
        ?*?@author:?***
        ?*?@date:?2020/6/21?16:51
        ?*/

        @Component
        public?class?SocketServer?implements?ApplicationListener<ContextRefreshedEvent>?{
        ????private?final?ExecutorService?executorService=?Executors.newCachedThreadPool();

        ????@Override
        ????public?void?onApplicationEvent(ContextRefreshedEvent?contextRefreshedEvent)?{
        ????????ServerSocket?serverSocket=null;
        ????????try?{
        ????????????serverSocket?=?new?ServerSocket(8888);
        ????????????while?(true)?{
        ????????????????Socket?accept?=?serverSocket.accept();
        ????????????????//?線程池處理socket
        ????executorService.execute(new?ProcessorHandler(accept));
        ????????????}
        ????????}?catch?(IOException?e)?{
        ????????????e.printStackTrace();
        ????????}?finally?{
        ????????????if?(serverSocket?!=?null)?{
        ????????????????try?{
        ????????????????????serverSocket.close();
        ????????????????}?catch?(IOException?e)?{
        ????????????????????e.printStackTrace();
        ????????????????}
        ????????????}
        ????????}
        ????}

        socket處理類ProcessorHandler

        處理監(jiān)聽到的每個socket

        public?class?ProcessorHandler?implements?Runnable?{

        ????private?Socket?socket;

        ????public?ProcessorHandler(Socket?socket)?{
        ????????this.socket?=?socket;
        ????}

        ????@Override
        ????public?void?run()?{
        ????????ObjectInputStream?inputStream?=?null;
        ????????ObjectOutputStream?outputStream?=?null;
        ????????try?{
        ????????????inputStream?=?new?ObjectInputStream(socket.getInputStream());
        ????????????//?反序列化
        ????????????RpcRequest?rpcRequest?=?(RpcRequest)?inputStream.readObject();

        ????????????//?中間代理執(zhí)行目標(biāo)方法
        ????????????Mediator?mediator?=?Mediator.getInstance();
        ????????????Object?response?=?mediator.processor(rpcRequest);
        ????????????System.out.println("服務(wù)端的執(zhí)行結(jié)果:"+response);

        ????????????outputStream?=?new?ObjectOutputStream(socket.getOutputStream());
        ????????????outputStream.writeObject(response);
        ????????????outputStream.flush();

        ????????}?catch?(Exception?e)?{
        ????????????e.printStackTrace();
        ????????}?finally?{
        ????????????closeStream(inputStream,?outputStream);
        ????????}
        ????}

        ????private?void?closeStream(ObjectInputStream?inputStream,?ObjectOutputStream?outputStream)?{
        ????????//?關(guān)閉流
        ????????if(inputStream!=null){
        ????????????try?{
        ????????????????inputStream.close();
        ????????????}?catch?(IOException?e)?{
        ????????????????e.printStackTrace();
        ????????????}
        ????????}
        ????????if?(outputStream!=null){
        ????????????try?{
        ????????????????outputStream.close();
        ????????????}?catch?(IOException?e)?{
        ????????????????e.printStackTrace();
        ????????????}
        ????????}
        ????}

        服務(wù)端代理

        Mediator

        /**
        ?*?服務(wù)端socket與目標(biāo)方法的中間代理層
        ?*
        ?*?@author:?***
        ?*?@date:?2020/6/21?16:25
        ?*/

        public?class?Mediator?{

        ????/**?用來存儲發(fā)布的服務(wù)的實例(服務(wù)調(diào)用的路由)?*/
        ????public?static?Map?ROUTING?=?new?ConcurrentHashMap<>();

        ????/**?單例模式創(chuàng)建該代理層實例?*/
        ????private?volatile?static?Mediator?instance;

        ????private?Mediator()?{
        ????}

        ????public?static?Mediator?getInstance()?{
        ????????if?(instance?==?null)?{
        ????????????synchronized?(Mediator.class)?{
        ????????????????if?(instance?==?null)?{
        ????????????????????instance?=?new?Mediator();
        ????????????????}
        ????????????}
        ????????}
        ????????return?instance;
        ????}

        ????public?Object?processor(RpcRequest?rpcRequest)?{
        ????????//?路由key
        ????????String?routingKey?=?rpcRequest.getClassName()?+?"."?+?rpcRequest.getMethodName();
        ????????BeanMethod?beanMethod?=?ROUTING.get(routingKey);
        ????????if?(beanMethod?==?null)?{
        ????????????return?null;
        ????????}
        ????????//?執(zhí)行目標(biāo)方法
        ????????Object?bean?=?beanMethod.getBean();
        ????????Method?method?=?beanMethod.getMethod();
        ????????try?{
        ????????????return?method.invoke(bean,?rpcRequest.getArgs());
        ????????}?catch?(Exception?e)?{
        ????????????e.printStackTrace();
        ????????}
        ????????return?null;
        ????}
        }

        BeanMethod

        /**
        ?*?中間層反射調(diào)用時,存儲目標(biāo)方法、目標(biāo)類的實體
        ?*
        ?*?@author:?***
        ?*?@date:?2020/6/21?16:43
        ?*/

        public?class?BeanMethod?{

        ????private?Object?bean;

        ????private?Method?method;

        ????//?setter、getter略
        }

        客戶端項目

        項目結(jié)構(gòu)

        服務(wù)發(fā)現(xiàn)

        服務(wù)發(fā)現(xiàn)我們同樣使用注解來做,這就需要參照Spring中@Autowired的原理實現(xiàn),我們自定義@RpcReference注解,定義在字段上,將接口實現(xiàn)的代理類注入到該字段上。

        @RpcReference注解

        /**
        ?*?服務(wù)注入注解
        ?*
        ?*?@author:?***
        ?*?@date:?2020/6/20?22:41
        ?*/

        @Target(ElementType.FIELD)
        @Retention(RetentionPolicy.RUNTIME)
        @Component
        public?@interface?RpcReference?{
        }

        服務(wù)發(fā)現(xiàn)類ReferenceInvokeProxy

        在spring容器初始化之前,掃描bean中所有@RpcReference注解標(biāo)記的字段。

        /**
        ?*?遠程動態(tài)調(diào)用service代理
        ?*
        ?*?@author:?***
        ?*?@date:?2020/6/20?22:44
        ?*/

        @Component
        public?class?ReferenceInvokeProxy?implements?BeanPostProcessor?{

        ????@Autowired
        ????private?RemoteInvocationHandler?invocationHandler;

        ????@Override
        ????public?Object?postProcessBeforeInitialization(Object?bean,?String?beanName)?throws?BeansException?{
        ????????Field[]?fields?=?bean.getClass().getDeclaredFields();
        ????????for?(Field?field?:?fields)?{
        ????????????if?(field.isAnnotationPresent(RpcReference.class))?{
        ????????????????field.setAccessible(true);
        ????????????????//?針對這個加了RpcReference注解的字段,設(shè)置為一個代理的值
        ????????????????Object?proxy?=?Proxy.newProxyInstance(field.getType().getClassLoader(),?new?Class[]{field.getType()},?invocationHandler);
        ????????????????try?{
        ????????????????????//?相當(dāng)于針對加了RpcReference的注解,設(shè)置了一個代理,這個代理的實現(xiàn)是invocationHandler
        ????????????????????field.set(bean,?proxy);
        ????????????????}?catch?(IllegalAccessException?e)?{
        ????????????????????e.printStackTrace();
        ????????????????}
        ????????????}
        ????????}
        ????????return?bean;
        ????}
        }

        客戶端代理

        客戶端動態(tài)代理InvocationHandler實現(xiàn)類RemoteInvocationHandler

        將目標(biāo)方法名、目標(biāo)類名、參數(shù)信息封裝到RpcRequest,然后交給socket發(fā)送到服務(wù)端。

        /**
        ?*?@author:?***
        ?*?@date:?2020/6/20?22:51
        ?*/

        @Component
        public?class?RemoteInvocationHandler?implements?InvocationHandler?{

        ????@Autowired
        ????private?RpcNetTransport?rpcNetTransport;

        ????@Override
        ????public?Object?invoke(Object?proxy,?Method?method,?Object[]?args)?throws?Throwable?{
        ????????RpcRequest?rpcRequest?=?new?RpcRequest();
        ????????rpcRequest.setClassName(method.getDeclaringClass().getName());
        ????????rpcRequest.setMethodName(method.getName());
        ????????rpcRequest.setTypes(method.getParameterTypes());
        ????????rpcRequest.setArgs(args);
        ????????return?rpcNetTransport.send(rpcRequest);
        ????}
        }

        客戶端socket

        網(wǎng)絡(luò)傳輸RpcNetTransport

        /**
        ?*?rpc?socket?網(wǎng)絡(luò)傳輸
        ?*
        ?*?@author:?***
        ?*?@date:?2020/6/20?22:59
        ?*/

        @Component
        public?class?RpcNetTransport?{
        ????@Value("${rpc.host}")
        ????private?String?host;
        ????@Value("${rpc.port}")
        ????private?int?port;


        ????public?Object?send(RpcRequest?rpcRequest)?{
        ????????ObjectOutputStream?outputStream?=?null;
        ????????ObjectInputStream?inputStream?=?null;
        ????????try?{
        ????????????Socket?socket?=?new?Socket(host,?port);
        ????????????//?發(fā)送目標(biāo)方法信息
        ????????????outputStream?=?new?ObjectOutputStream(socket.getOutputStream());
        ????????????outputStream.writeObject(rpcRequest);
        ????????????outputStream.flush();
        ???//?接收返回值
        ????????????inputStream?=?new?ObjectInputStream(socket.getInputStream());
        ????????????return?inputStream.readObject();
        ????????}?catch?(IOException?|?ClassNotFoundException?e)?{
        ????????????e.printStackTrace();
        ????????}?finally?{
        ????????????closeStream(inputStream,?outputStream);
        ????????}
        ????????return?null;
        ????}

        ????private?void?closeStream(ObjectInputStream?inputStream,?ObjectOutputStream?outputStream)?{
        ????????//?關(guān)閉流
        ????????if(inputStream!=null){
        ????????????try?{
        ????????????????inputStream.close();
        ????????????}?catch?(IOException?e)?{
        ????????????????e.printStackTrace();
        ????????????}
        ????????}
        ????????if?(outputStream!=null){
        ????????????try?{
        ????????????????outputStream.close();
        ????????????}?catch?(IOException?e)?{
        ????????????????e.printStackTrace();
        ????????????}
        ????????}
        ????}
        }
        瀏覽 37
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            毛片超碰 | 中日韩欧美 | 国产精品精品久久久久久52AV | 久久久久久久久久久久影院 | 久久对白 | 免费又黄又爽又猛大片午夜 | 凹凸69堂国产成人精品 | 在线观看免费A片成人网站 | 操中国老女人 | 亚洲中文字幕第一 |