1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        10 個(gè)類手寫實(shí)現(xiàn) RPC 通信框架

        共 9493字,需瀏覽 19分鐘

         ·

        2020-08-07 01:25

        鏈接:autumn200.com



        么是rpc

        RPC:remote procedure call Protocol 遠(yuǎn)程過程調(diào)用 調(diào)用遠(yuǎn)程服務(wù),就像調(diào)用本地的服務(wù)一樣,不用關(guān)心調(diào)用細(xì)節(jié),就像調(diào)用本機(jī)的服務(wù)一樣的

        RPC原理

        實(shí)現(xiàn)RPC通信的程序包括5個(gè)部分:rpc-client、客戶端proxy、socket、服務(wù)端proxy、rpc-server

        request


        • 客戶端:當(dāng)rpc-client發(fā)起遠(yuǎn)程調(diào)用時(shí),實(shí)際上是通過客戶端代理 將要調(diào)用的接口、方法、參數(shù)、參數(shù)類型進(jìn)行序列化,然后通過socket實(shí)時(shí)將封裝調(diào)用參數(shù)的實(shí)例發(fā)送到服務(wù)端。
        • 服務(wù)端:socket接收到客戶端傳來的信息進(jìn)行反序列化,然后通過這些信息委派到具體的實(shí)現(xiàn)對象

        response

        • 服務(wù)端:目標(biāo)方法執(zhí)行完成后,將執(zhí)行結(jié)果返回給socket
        • 客戶端:socket接收到結(jié)果后,返回給rpc-client,調(diào)用結(jié)束

        應(yīng)用到的技術(shù)

        • java
        • spring
        • 序列化
        • socket
        • 反射
        • 動(dòng)態(tài)代理


        項(xiàng)目GitHub地址

        https://github.com/autumnqfeng/write_rpc

        掘金地址

        https://juejin.im/post/5eef0fe551882565a459e8b6

        服務(wù)端項(xiàng)目


        項(xiàng)目結(jié)構(gòu)

        rpc-server項(xiàng)目包含2個(gè)子項(xiàng)目:order-api、order-provider
        order-api中存放請求接口與RpcRequest(類名、方法名、參數(shù)的實(shí)體類)
        order-provider為請求接口實(shí)現(xiàn)、socket、proxy相關(guān)類

        order-api

        order-provider

        服務(wù)注冊

        要想實(shí)現(xiàn)動(dòng)態(tài)調(diào)用ServiceImpl,關(guān)鍵就需要將service類管理起來,那問題來了,我們?nèi)绾喂芾磉@些服務(wù)類呢?
        我們可以參照spring中的@Service注解,通過自定義注解的方式來做到服務(wù)注冊,我們定義一個(gè)注解@RpcRemoteService作用在ServiceImpl類上,將標(biāo)記此注解的類名、方法名保存到Map中,以此來定位到具體實(shí)現(xiàn)類。
        @RpcRemoteService注解
        /**
        ?* 服務(wù)端服務(wù)發(fā)現(xiàn)注解
        ?*
        ?* @author: ***
        ?* @date: 2020/6/21 16:21
        ?*/

        @Target(ElementType.TYPE)
        @Retention(RetentionPolicy.RUNTIME)
        @Component
        public @interface?RpcRemoteService {
        }


        服務(wù)注冊類InitialMerdiator
        在spring容器初始化完成之后,掃描到@RpcRemoteService標(biāo)記的類,并保存到Mediator.ROUTING中。
        /**
        ?* 初始化中間代理層對象
        ?*
        ?* @author: ***
        ?* @date: 2020/6/21 16:33
        ?*/

        @Component
        public?class?InitialMerdiator?implements?BeanPostProcessor?{

        ????@Override
        ????public?Object postProcessAfterInitialization(Object bean, String beanName)?throws?BeansException {
        ????????//加了服務(wù)發(fā)布標(biāo)記的bean進(jìn)行遠(yuǎn)程發(fā)布
        ????????if?(bean.getClass().isAnnotationPresent(RpcRemoteService.class)) {
        ????????????Method[] methods = bean.getClass().getDeclaredMethods();
        ????????????for?(Method method : methods) {
        ????????????????String routingKey = bean.getClass().getInterfaces()[0].getName() + "."?+ method.getName();
        ????????????????BeanMethod beanMethod = new?BeanMethod();
        ????????????????beanMethod.setBean(bean);
        ????????????????beanMethod.setMethod(method);
        ????????????????Mediator.ROUTING.put(routingKey, beanMethod);
        ????????????}
        ????????}
        ????????return?bean;
        ????}
        }
        socket監(jiān)聽
        socket監(jiān)聽客戶端請求
        socket啟動(dòng)類SocketServer
        spring容器加載完成之后,啟動(dòng)socket
        /**
        ?* spring 容器啟動(dòng)完成之后,會(huì)發(fā)布一個(gè)ContextRefreshedEven
        ?* 容器啟動(dòng)后啟動(dòng)socket監(jiān)聽
        ?*
        ?* @author: ***
        ?* @date: 2020/6/21 16:51
        ?*/

        @Component
        public?class?SocketServer?implements?ApplicationListener<ContextRefreshedEvent> {
        ????private?final?ExecutorService executorService= Executors.newCachedThreadPool();

        ????@Override
        ????public?void?onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent)?{
        ????????ServerSocket serverSocket=null;
        ????????try?{
        ????????????serverSocket = new?ServerSocket(8888);
        ????????????while?(true) {
        ????????????????Socket accept = serverSocket.accept();
        ????????????????// 線程池處理socket
        ????????executorService.execute(new?ProcessorHandler(accept));
        ????????????}
        ????????} catch?(IOException e) {
        ????????????e.printStackTrace();
        ????????} finally?{
        ????????????if?(serverSocket != null) {
        ????????????????try?{
        ????????????????????serverSocket.close();
        ????????????????} catch?(IOException e) {
        ????????????????????e.printStackTrace();
        ????????????????}
        ????????????}
        ????????}
        ????}
        socket處理類ProcessorHandler
        處理監(jiān)聽到的每個(gè)socket
        public?class?ProcessorHandler?implements?Runnable?{

        ????private?Socket socket;

        ????public?ProcessorHandler(Socket socket)?{
        ????????this.socket = socket;
        ????}

        ????@Override
        ????public?void?run()?{
        ????????ObjectInputStream inputStream = null;
        ????????ObjectOutputStream outputStream = null;
        ????????try?{
        ????????????inputStream = new?ObjectInputStream(socket.getInputStream());
        ????????????// 反序列化
        ????????????RpcRequest rpcRequest = (RpcRequest) inputStream.readObject();

        ????????????// 中間代理執(zhí)行目標(biāo)方法
        ????????????Mediator mediator = Mediator.getInstance();
        ????????????Object response = mediator.processor(rpcRequest);
        ????????????System.out.println("服務(wù)端的執(zhí)行結(jié)果:"+response);

        ????????????outputStream = new?ObjectOutputStream(socket.getOutputStream());
        ????????????outputStream.writeObject(response);
        ????????????outputStream.flush();

        ????????} catch?(Exception e) {
        ????????????e.printStackTrace();
        ????????} finally?{
        ????????????closeStream(inputStream, outputStream);
        ????????}
        ????}

        ????private?void?closeStream(ObjectInputStream inputStream, ObjectOutputStream outputStream)?{
        ????????// 關(guān)閉流
        ????????if(inputStream!=null){
        ????????????try?{
        ????????????????inputStream.close();
        ????????????} catch?(IOException e) {
        ????????????????e.printStackTrace();
        ????????????}
        ????????}
        ????????if?(outputStream!=null){
        ????????????try?{
        ????????????????outputStream.close();
        ????????????} catch?(IOException e) {
        ????????????????e.printStackTrace();
        ????????????}
        ????????}
        ????}
        服務(wù)端代理
        Mediator
        /**
        ?* 服務(wù)端socket與目標(biāo)方法的中間代理層
        ?*
        ?* @author: ***
        ?* @date: 2020/6/21 16:25
        ?*/

        public?class?Mediator?{

        ????/** 用來存儲(chǔ)發(fā)布的服務(wù)的實(shí)例(服務(wù)調(diào)用的路由) */
        ????public?static?Map ROUTING = new?ConcurrentHashMap<>();

        ????/** 單例模式創(chuàng)建該代理層實(shí)例 */
        ????private?volatile?static?Mediator instance;

        ????private?Mediator() {
        ????}

        ????public?static?Mediator getInstance() {
        ????????if?(instance == null) {
        ????????????synchronized (Mediator.class) {
        ????????????????if?(instance == null) {
        ????????????????????instance = new?Mediator();
        ????????????????}
        ????????????}
        ????????}
        ????????return?instance;
        ????}

        ????public?Object processor(RpcRequest rpcRequest) {
        ????????// 路由key
        ????????String routingKey = rpcRequest.getClassName() + "."?+ rpcRequest.getMethodName();
        ????????BeanMethod beanMethod = ROUTING.get(routingKey);
        ????????if?(beanMethod == null) {
        ????????????return?null;
        ????????}
        ????????// 執(zhí)行目標(biāo)方法
        ????????Object bean = beanMethod.getBean();
        ????????Method method = beanMethod.getMethod();
        ????????try?{
        ????????????return?method.invoke(bean, rpcRequest.getArgs());
        ????????} catch?(Exception e) {
        ????????????e.printStackTrace();
        ????????}
        ????????return?null;
        ????}
        }


        BeanMethod
        /**
        ?* 中間層反射調(diào)用時(shí),存儲(chǔ)目標(biāo)方法、目標(biāo)類的實(shí)體
        ?*
        ?* @author: ***
        ?* @date: 2020/6/21 16:43
        ?*/

        public?class?BeanMethod?{

        ????private?Object bean;

        ????private?Method method;

        ????// setter、getter略
        }


        客戶端項(xiàng)目


        項(xiàng)目結(jié)構(gòu)

        服務(wù)發(fā)現(xiàn)

        服務(wù)發(fā)現(xiàn)我們同樣使用注解來做,這就需要參照Spring中@Autowired的原理實(shí)現(xiàn),我們自定義@RpcReference注解,定義在字段上,將接口實(shí)現(xiàn)的代理類注入到該字段上。
        @RpcReference注解
        /**
        ?* 服務(wù)注入注解
        ?*
        ?* @author: ***
        ?* @date: 2020/6/20 22:41
        ?*/

        @Target(ElementType.FIELD)
        @Retention(RetentionPolicy.RUNTIME)
        @Component
        public @interface?RpcReference {


        服務(wù)發(fā)現(xiàn)類ReferenceInvokeProxy
        在spring容器初始化之前,掃描bean中所有@RpcReference注解標(biāo)記的字段。
        /**
        ?* 遠(yuǎn)程動(dòng)態(tài)調(diào)用service代理
        ?*
        ?* @author: ***
        ?* @date: 2020/6/20 22:44
        ?*/

        @Component
        public?class?ReferenceInvokeProxy?implements?BeanPostProcessor?{

        ????@Autowired
        ????private?RemoteInvocationHandler invocationHandler;

        ????@Override
        ????public?Object postProcessBeforeInitialization(Object bean, String beanName)?throws?BeansException {
        ????????Field[] fields = bean.getClass().getDeclaredFields();
        ????????for?(Field field : fields) {
        ????????????if?(field.isAnnotationPresent(RpcReference.class)) {
        ????????????????field.setAccessible(true);
        ????????????????// 針對這個(gè)加了RpcReference注解的字段,設(shè)置為一個(gè)代理的值
        ????????????????Object proxy = Proxy.newProxyInstance(field.getType().getClassLoader(), new?Class[]{field.getType()}, invocationHandler);
        ????????????????try?{
        ????????????????????// 相當(dāng)于針對加了RpcReference的注解,設(shè)置了一個(gè)代理,這個(gè)代理的實(shí)現(xiàn)是invocationHandler
        ????????????????????field.set(bean, proxy);
        ????????????????} catch?(IllegalAccessException e) {
        ????????????????????e.printStackTrace();
        ????????????????}
        ????????????}
        ????????}
        ????????return?bean;
        ????}
        }
        客戶端代理
        客戶端動(dòng)態(tài)代理InvocationHandler實(shí)現(xiàn)類RemoteInvocationHandler
        將目標(biāo)方法名、目標(biāo)類名、參數(shù)信息封裝到RpcRequest,然后交給socket發(fā)送到服務(wù)端。
        /**
        ?* @author: ***
        ?* @date: 2020/6/20 22:51
        ?*/

        @Component
        public?class?RemoteInvocationHandler?implements?InvocationHandler?{

        ????@Autowired
        ????private?RpcNetTransport rpcNetTransport;

        ????@Override
        ????public?Object invoke(Object proxy, Method method, Object[] args)?throws?Throwable {
        ????????RpcRequest rpcRequest = new?RpcRequest();
        ????????rpcRequest.setClassName(method.getDeclaringClass().getName());
        ????????rpcRequest.setMethodName(method.getName());
        ????????rpcRequest.setTypes(method.getParameterTypes());
        ????????rpcRequest.setArgs(args);
        ????????return?rpcNetTransport.send(rpcRequest);
        ????}
        }
        客戶端socket
        網(wǎng)絡(luò)傳輸RpcNetTransport


        /**
        * rpc socket 網(wǎng)絡(luò)傳輸
        ?*
        ?* @author: ***
        ?* @date: 2020/6/20 22:59
        ?*/

        @Component
        public?class?RpcNetTransport?{
        ????@Value("${rpc.host}")
        ????private?String host;
        ????@Value("${rpc.port}")
        ????private?int?port;


        ????public?Object send(RpcRequest rpcRequest)?{
        ????????ObjectOutputStream outputStream = null;
        ????????ObjectInputStream inputStream = null;
        ????????try?{
        ????????????Socket socket = new?Socket(host, port);
        ????????????// 發(fā)送目標(biāo)方法信息
        ????????????outputStream = new?ObjectOutputStream(socket.getOutputStream());
        ????????????outputStream.writeObject(rpcRequest);
        ????????????outputStream.flush();
        ??????// 接收返回值
        ????????????inputStream = new?ObjectInputStream(socket.getInputStream());
        ????????????return?inputStream.readObject();
        ????????} catch?(IOException | ClassNotFoundException e) {
        ????????????e.printStackTrace();
        ????????} finally?{
        ????????????closeStream(inputStream, outputStream);
        ????????}
        ????????return?null;
        ????}

        ????private?void?closeStream(ObjectInputStream inputStream, ObjectOutputStream outputStream)?{
        ????????// 關(guān)閉流
        ????????if(inputStream!=null){
        ????????????try?{
        ????????????????inputStream.close();
        ????????????} catch?(IOException e) {
        ????????????????e.printStackTrace();
        ????????????}
        ????????}
        ????????if?(outputStream!=null){
        ????????????try?{
        ????????????????outputStream.close();
        ????????????} catch?(IOException e) {
        ????????????????e.printStackTrace();
        ????????????}
        ????????}
        ????}
        }




        -END-

        如果看到這里,說明你喜歡這篇文章,請?轉(zhuǎn)發(fā)、點(diǎn)贊。同時(shí)?標(biāo)星(置頂)本公眾號可以第一時(shí)間接受到博文推送。

        博主微信:baiseyumaoxx,之前博主分享了很多資源,有的已經(jīng)刪除了(你懂得),如果有的你當(dāng)時(shí)沒有領(lǐng)到還想領(lǐng)得就可以加我微信,我在發(fā)給你,你需要得資源也可以給我說,我盡力給你找~


        明天見(??ω??)??


        喜歡文章,點(diǎn)個(gè)在看?

        瀏覽 66
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評論
        圖片
        表情
        推薦
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            91蜜桃传媒在线观看 | 免费无码成人电影 | 韩国大尺度电影《陷阱》 | jizzjizzjizz少妇 | 操12p| 国产女人18毛片水18精品变态 | 欧美伦理精品亚洲专区 | 欧美激情视频一区二区 | 天天操天天射天天操 | 日韩人妻一区二区三区蜜桃 |