1. 自己動手從0開始實現(xiàn)一個分布式RPC框架

        共 11360字,需瀏覽 23分鐘

         ·

        2021-07-26 16:10


        前言

        為什么要自己寫一個RPC框架,我覺得從個人成長上說,如果一個程序員能清楚的了解RPC框架所具備的要素,掌握RPC框架中涉及的服務注冊發(fā)現(xiàn)、負載均衡、序列化協(xié)議、RPC通信協(xié)議、Socket通信、異步調(diào)用、熔斷降級等技術,可以全方位的提升基本素質(zhì)。雖然也有相關源碼,但是只看源碼容易眼高手低,動手寫一個才是自己真正掌握這門技術的最優(yōu)路徑。

        一  什么是RPC

        RPC(Remote Procedure Call)遠程過程調(diào)用,簡言之就是像調(diào)用本地方法一樣調(diào)用遠程服務。目前外界使用較多的有gRPC、Dubbo、Spring Cloud等。相信大家對RPC的概念都已經(jīng)很熟悉了,這里不做過多介紹。

        二  分布式RPC框架要素

        一款分布式RPC框架離不開三個基本要素:

        • 服務提供方 Serivce Provider

        • 服務消費方 Servce Consumer

        • 注冊中心 Registery


        圍繞上面三個基本要素可以進一步擴展服務路由、負載均衡、服務熔斷降級、序列化協(xié)議、通信協(xié)議等等。


        1  注冊中心

        主要是用來完成服務注冊和發(fā)現(xiàn)的工作。雖然服務調(diào)用是服務消費方直接發(fā)向服務提供方的,但是現(xiàn)在服務都是集群部署,服務的提供者數(shù)量也是動態(tài)變化的,所以服務的地址也就無法預先確定。因此如何發(fā)現(xiàn)這些服務就需要一個統(tǒng)一注冊中心來承載。

        2  服務提供方(RPC服務端)

        其需要對外提供服務接口,它需要在應用啟動時連接注冊中心,將服務名及其服務元數(shù)據(jù)發(fā)往注冊中心。同時需要提供服務服務下線的機制。需要維護服務名和真正服務地址映射。服務端還需要啟動Socket服務監(jiān)聽客戶端請求。

        3  服務消費方(RPC客戶端)

        客戶端需要有從注冊中心獲取服務的基本能力,它需要在應用啟動時,掃描依賴的RPC服務,并為其生成代理調(diào)用對象,同時從注冊中心拉取服務元數(shù)據(jù)存入本地緩存,然后發(fā)起監(jiān)聽各服務的變動做到及時更新緩存。在發(fā)起服務調(diào)用時,通過代理調(diào)用對象,從本地緩存中獲取服務地址列表,然后選擇一種負載均衡策略篩選出一個目標地址發(fā)起調(diào)用。調(diào)用時會對請求數(shù)據(jù)進行序列化,并采用一種約定的通信協(xié)議進行socket通信。

        三  技術選型

        1  注冊中心

        目前成熟的注冊中心有Zookeeper,Nacos,Consul,Eureka,它們的主要比較如下:


        本實現(xiàn)中支持了兩種注冊中心Nacos和Zookeeper,可根據(jù)配置進行切換。

        2  IO通信框架 

        本實現(xiàn)采用Netty作為底層通信框架,Netty是一個高性能事件驅(qū)動型的非阻塞的IO(NIO)框架。

        3  通信協(xié)議

        TCP通信過程中會根據(jù)TCP緩沖區(qū)的實際情況進行包的劃分,所以在業(yè)務上認為一個完整的包可能會被TCP拆分成多個包進行發(fā)送,也有可能把多個小的包封裝成一個大的數(shù)據(jù)包發(fā)送,這就是所謂的TCP粘包和拆包問題。所以需要對發(fā)送的數(shù)據(jù)包封裝到一種通信協(xié)議里。

        業(yè)界的主流協(xié)議的解決方案可以歸納如下:

        1. 消息定長,例如每個報文的大小為固定長度100字節(jié),如果不夠用空格補足。

        2. 在包尾特殊結束符進行分割。

        3. 將消息分為消息頭和消息體,消息頭中包含表示消息總長度(或者消息體長度)的字段。


        很明顯1,2都有些局限性,本實現(xiàn)采用方案3,具體協(xié)議設計如下:

        +--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+ |  BYTE  |        |        |        |        |        |        |             ........ +--------------------------------------------+--------+-----------------+--------+--------+--------+--------+--------+--------+-----------------+ |  magic | version|  type  |           content lenth           |                   content byte[]                                        |        | +--------+-----------------------------------------------------------------------------------------+--------------------------------------------+

        • 第一個字節(jié)是魔法數(shù),比如我定義為0X35。

        • 第二個字節(jié)代表協(xié)議版本號,以便對協(xié)議進行擴展,使用不同的協(xié)議解析器。

        • 第三個字節(jié)是請求類型,如0代表請求1代表響應。

        • 第四個字節(jié)表示消息長度,即此四個字節(jié)后面此長度的內(nèi)容是消息content。


        4  序列化協(xié)議

        本實現(xiàn)支持3種序列化協(xié)議,JavaSerializer、Protobuf及Hessian可以根據(jù)配置靈活選擇。建議選用Protobuf,其序列化后碼流小性能高,非常適合RPC調(diào)用,Google自家的gRPC也是用其作為通信協(xié)議。

        5  負載均衡

        本實現(xiàn)支持兩種主要負載均衡策略,隨機和輪詢,其中他們都支持帶權重的隨機和輪詢,其實也就是四種策略。

        四  整體架構


        五  實現(xiàn)

        項目總體結構:

         
        1  服務注冊發(fā)現(xiàn)


        Zookeeper

        Zookeeper采用節(jié)點樹的數(shù)據(jù)模型,類似linux文件系統(tǒng),/,/node1,/node2 比較簡單。


        Zookeeper節(jié)點類型是Zookeeper實現(xiàn)很多功能的核心原理,分為持久節(jié)點臨時節(jié)點、順序節(jié)點三種類型的節(jié)點。

        我們采用的是對每個服務名創(chuàng)建一個持久節(jié)點,服務注冊時實際上就是在zookeeper中該持久節(jié)點下創(chuàng)建了一個臨時節(jié)點,該臨時節(jié)點存儲了服務的IP、端口、序列化方式等。


        客戶端獲取服務時通過獲取持久節(jié)點下的臨時節(jié)點列表,解析服務地址數(shù)據(jù):


        客戶端監(jiān)聽服務變化:


        Nacos

        Nacos是阿里開源的微服務管理中間件,用來完成服務之間的注冊發(fā)現(xiàn)和配置中心,相當于Spring Cloud的Eureka+Config。

        不像Zookeeper需要利用提供的創(chuàng)建節(jié)點特性來實現(xiàn)注冊發(fā)現(xiàn),Nacos專門提供了注冊發(fā)現(xiàn)功能,所以其使用更加方便簡單。主要關注NamingService接口提供的三個方法registerInstance、getAllInstances、subscribe;registerInstance用來完成服務端服務注冊,getAllInstances用來完成客戶端服務獲取,subscribe用來完成客戶端服務變動監(jiān)聽,這里就不多做介紹,具體可參照實現(xiàn)源碼。

        2  服務提供方 Serivce Provider

        在自動配置類OrcRpcAutoConfiguration完成注冊中心和RPC啟動類(RpcBootStarter)的初始化:


        服務端的啟動流程如下:


        RPC啟動(RpcBootStarter):


        上面監(jiān)聽Spring容器初始化事件時注意,由于Spring包含多個容器,如web容器和核心容器,他們還有父子關系,為了避免重復執(zhí)行注冊,只處理頂層的容器即可。

        3  服務消費方 Servce Consumer

        服務消費方需要在應用啟動完成前為依賴的服務創(chuàng)建好代理對象,這里有很多種方法,常見的有兩種:

        • 一是在應用的Spring Context初始化完成事件時觸發(fā),掃描所有的Bean,將Bean中帶有OrcRpcConsumer注解的field獲取到,然后創(chuàng)建field類型的代理對象,創(chuàng)建完成后,將代理對象set給此field。后續(xù)就通過該代理對象創(chuàng)建服務端連接,并發(fā)起調(diào)用。


        • 二是通過Spring的BeanFactoryPostProcessor,其可以對bean的定義BeanDefinition(配置元數(shù)據(jù))進行處理;Spring IOC會在容器實例化任何其他bean之前運行BeanFactoryPostProcessor讀取BeanDefinition,可以修改這些BeanDefinition,也可以新增一些BeanDefinition。


        本實現(xiàn)也采用第二種方式,處理流程如下:


        BeanFactoryPostProcessor的主要實現(xiàn):

            @Override    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory)        throws BeansException {        this.beanFactory = beanFactory;        postProcessRpcConsumerBeanFactory(beanFactory, (BeanDefinitionRegistry)beanFactory);    }
        private void postProcessRpcConsumerBeanFactory(ConfigurableListableBeanFactory beanFactory, BeanDefinitionRegistry beanDefinitionRegistry) { String[] beanDefinitionNames = beanFactory.getBeanDefinitionNames(); int len = beanDefinitionNames.length; for (int i = 0; i < len; i++) { String beanDefinitionName = beanDefinitionNames[i]; BeanDefinition beanDefinition = beanFactory.getBeanDefinition(beanDefinitionName); String beanClassName = beanDefinition.getBeanClassName(); if (beanClassName != null) { Class<?> clazz = ClassUtils.resolveClassName(beanClassName, classLoader); ReflectionUtils.doWithFields(clazz, new FieldCallback() { @Override public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException { parseField(field); } }); }
        }
        Iterator<Entry<String, BeanDefinition>> it = beanDefinitions.entrySet().iterator(); while (it.hasNext()) { Entry<String, BeanDefinition> entry = it.next(); if (context.containsBean(entry.getKey())) { throw new IllegalArgumentException("Spring context already has a bean named " + entry.getKey()); } beanDefinitionRegistry.registerBeanDefinition(entry.getKey(), entry.getValue()); log.info("register OrcRpcConsumerBean definition: {}", entry.getKey()); }
        }
        private void parseField(Field field) { // 獲取所有OrcRpcConsumer注解 OrcRpcConsumer orcRpcConsumer = field.getAnnotation(OrcRpcConsumer.class); if (orcRpcConsumer != null) { // 使用field的類型和OrcRpcConsumer注解一起生成BeanDefinition OrcRpcConsumerBeanDefinitionBuilder beanDefinitionBuilder = new OrcRpcConsumerBeanDefinitionBuilder(field.getType(), orcRpcConsumer); BeanDefinition beanDefinition = beanDefinitionBuilder.build(); beanDefinitions.put(field.getName(), beanDefinition); } }

        ProxyFactory的主要實現(xiàn):

        public class JdkProxyFactory implements ProxyFactory{
        @Override public Object getProxy(ServiceMetadata serviceMetadata) { return Proxy .newProxyInstance(serviceMetadata.getClazz().getClassLoader(), new Class[] {serviceMetadata.getClazz()}, new ClientInvocationHandler(serviceMetadata)); }
        private class ClientInvocationHandler implements InvocationHandler {
        private ServiceMetadata serviceMetadata;
        public ClientInvocationHandler(ServiceMetadata serviceMetadata) { this.serviceMetadata = serviceMetadata; }
        @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String serviceId = ServiceUtils.getServiceId(serviceMetadata); // 通過負載均衡器選取一個服務提供方地址 ServiceURL service = InvocationServiceSelector.select(serviceMetadata);
        OrcRpcRequest request = new OrcRpcRequest(); request.setMethod(method.getName()); request.setParameterTypes(method.getParameterTypes()); request.setParameters(args); request.setRequestId(UUID.randomUUID().toString()); request.setServiceId(serviceId);
        OrcRpcResponse response = InvocationClientContainer.getInvocationClient(service.getServerNet()).invoke(request, service); if (response.getStatus() == RpcStatusEnum.SUCCESS) { return response.getData(); } else if (response.getException() != null) { throw new OrcRpcException(response.getException().getMessage()); } else { throw new OrcRpcException(response.getStatus().name()); } } }}

        本實現(xiàn)只使用JDK動態(tài)代理,也可以使用cglib或Javassist實現(xiàn)以獲得更好的性能,JdkProxyFactory中。

        4  IO模塊


        UML圖如下:


        結構比較清晰,分三大模塊:客戶端調(diào)用適配模塊、服務端請求響應適配模塊和Netty IO服務模塊。

        客戶端調(diào)用適配模塊

        此模塊比較簡單,主要是為客戶端調(diào)用時建立服務端接,并將連接存入緩存,避免后續(xù)同服務調(diào)用重復建立連接,連接建立成功后發(fā)起調(diào)用。下面是DefaultInvocationClient的實現(xiàn):


        服務端請求響應適配模塊

        服務請求響應模塊也比較簡單,是根據(jù)請求中的服務名,從緩存中獲取服務元數(shù)據(jù),然后從請求中獲取調(diào)用的方法和參數(shù)類型信息,反射獲取調(diào)用方法信息。然后從spring context中獲取bean進行反射調(diào)用。


        Netty IO服務模塊

        Netty IO服務模塊是核心,稍復雜一些,客戶端和服務端主要處理流程如下:


        其中,重點是四個類的實現(xiàn):NettyNetClient、NettyNetServer、NettyClientChannelRequestHandler和NettyServerChannelRequestHandler,上面的UML圖和下面流程圖基本上講清楚了它們的關系和一次請求的處理流程,這里就不再展開了。

        下面重點講一下編碼解碼器。

        在技術選型章節(jié)中,提及了采用的通信協(xié)議,定義了私有的RPC協(xié)議:

        +--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+ |  BYTE  |        |        |        |        |        |        |             ........ +--------------------------------------------+--------+-----------------+--------+--------+--------+--------+--------+--------+-----------------+ |  magic | version|  type  |           content lenth           |                   content byte[]                                        |        | +--------+-----------------------------------------------------------------------------------------+--------------------------------------------+


        • 第一個字節(jié)是魔法數(shù)定義為0X35。

        • 第二個字節(jié)代表協(xié)議版本號。

        • 第三個字節(jié)是請求類型,0代表請求1代表響應。

        • 第四個字節(jié)表示消息長度,即此四個字節(jié)后面此長度的內(nèi)容是消息content。


        編碼器的實現(xiàn)如下:

        @Overrideprotected void encode(ChannelHandlerContext channelHandlerContext, ProtocolMsg protocolMsg, ByteBuf byteBuf)    throws Exception {    // 寫入?yún)f(xié)議頭    byteBuf.writeByte(ProtocolConstant.MAGIC);    // 寫入版本    byteBuf.writeByte(ProtocolConstant.DEFAULT_VERSION);    // 寫入請求類型    byteBuf.writeByte(protocolMsg.getMsgType());    // 寫入消息長度    byteBuf.writeInt(protocolMsg.getContent().length);    // 寫入消息內(nèi)容    byteBuf.writeBytes(protocolMsg.getContent());}

        解碼器的實現(xiàn)如下:

        /** * 協(xié)議開始的標志 magic + version + type + length 占據(jù)7個字節(jié) */public final int BASE_LENGTH = 7;
        @Overrideprotected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { // 可讀字節(jié)小于基本長度,無法解析出payload長度,返回 if (byteBuf.readableBytes() < BASE_LENGTH) { return; } // 記錄包頭開始的index int beginIndex; while (true) { // 記錄包頭開始的index beginIndex = byteBuf.readerIndex(); // 標記包頭開始的index byteBuf.markReaderIndex(); // 讀到了協(xié)議頭魔數(shù),結束循環(huán) if (byteBuf.readByte() == ProtocolConstant.MAGIC) { break; } // 未讀到包頭,略過一個字節(jié) // 每次略過一個字節(jié),去讀取包頭信息的開始標記 byteBuf.resetReaderIndex(); byteBuf.readByte();
        /** * 當略過,一個字節(jié)之后,數(shù)據(jù)包的長度,又變得不滿足 * 此時結束。等待后面的數(shù)據(jù)到達 */ if (byteBuf.readableBytes() < BASE_LENGTH) { return; } } // 讀取版本號 byte version = byteBuf.readByte(); // 讀取消息類型 byte type = byteBuf.readByte(); // 讀取消息長度 int length = byteBuf.readInt(); // 判斷本包是否完整 if (byteBuf.readableBytes() < length) { // 還原讀指針 byteBuf.readerIndex(beginIndex); return; } byte[] data = new byte[length]; byteBuf.readBytes(data);
        ProtocolMsg msg = new ProtocolMsg(); msg.setMsgType(type); msg.setContent(data); list.add(msg);}

        六  測試

        在本人MacBook Pro 13寸,4核I5,16g內(nèi)存,使用Nacos注冊中心,啟動一個服務器,一個客戶端情況下,采用輪詢負載均衡策略的情況下,使用Apache ab測試。

        在啟用8個線程發(fā)起10000個請求的情況下,可以做到18秒完成所有請求,qps550:



        在啟用100個線程發(fā)起10000個請求的情況下,可以做到13.8秒完成所有請求,qps724:


        七  總結

        在實現(xiàn)這個RPC框架的過程中,我也重新學習了很多知識,比如通信協(xié)議、IO框架等。也橫向?qū)W習了當前最熱的gRPC,借此又看了很多相關的源碼,收獲很大。后續(xù)我也會繼續(xù)維護升級這個框架,比如引入熔斷降級等機制,做到持續(xù)學習持續(xù)進步。

        推薦閱讀:

        《微服務中的灰度發(fā)布是什么?實現(xiàn)思路又是什么?》



        聊技術,不止于技術。

        在這里我會分享技術文章、管理知識以及個人的思想感悟,歡迎點擊關注。
        瀏覽 80
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
          
          

            1. 国产专区在线播放 | 亚洲AV永久一区二区三区蜜桃 | 白丝校花被狂揉大胸 | 高清无码-熊猫成人网 | 动漫美女巨乳被吸羞羞视频 |