1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        阿里終面:如何設(shè)計(jì)一個(gè)高性能網(wǎng)關(guān)?

        共 28907字,需瀏覽 58分鐘

         ·

        2021-02-06 11:59

        來(lái)源:cnblogs.com/2YSP/p/14223892.html

        一、前言

        最近在github上看了soul網(wǎng)關(guān)的設(shè)計(jì),突然就來(lái)了興趣準(zhǔn)備自己從零開(kāi)始寫(xiě)一個(gè)高性能的網(wǎng)關(guān)。經(jīng)過(guò)兩周時(shí)間的開(kāi)發(fā),我的網(wǎng)關(guān)ship-gate核心功能基本都已完成,最大的缺陷就是前端功底太差沒(méi)有管理后臺(tái)??。

        二、設(shè)計(jì)

        2.1技術(shù)選型

        網(wǎng)關(guān)是所有請(qǐng)求的入口,所以要求有很高的吞吐量,為了實(shí)現(xiàn)這點(diǎn)可以使用請(qǐng)求異步化來(lái)解決。目前一般有以下兩種方案:

        • Tomcat/Jetty+NIO+Servlet3

        Servlet3已經(jīng)支持異步,這種方案使用比較多,京東,有贊和Zuul,都用的是這種方案。

        • Netty+NIO

        Netty為高并發(fā)而生,目前唯品會(huì)的網(wǎng)關(guān)使用這個(gè)策略,在唯品會(huì)的技術(shù)文章中在相同的情況下Netty是每秒30w+的吞吐量,Tomcat是13w+,可以看出是有一定的差距的,但是Netty需要自己處理HTTP協(xié)議,這一塊比較麻煩。

        后面發(fā)現(xiàn)Soul網(wǎng)關(guān)是基于Spring WebFlux(底層Netty)的,不用太關(guān)心HTTP協(xié)議的處理,于是決定也用Spring WebFlux。

        網(wǎng)關(guān)的第二個(gè)特點(diǎn)是具備可擴(kuò)展性,比如Netflix Zuul有preFilters,postFilters等在不同的階段方便處理不同的業(yè)務(wù),基于責(zé)任鏈模式將請(qǐng)求進(jìn)行鏈?zhǔn)教幚砑纯蓪?shí)現(xiàn)。

        在微服務(wù)架構(gòu)下,服務(wù)都會(huì)進(jìn)行多實(shí)例部署來(lái)保證高可用,請(qǐng)求到達(dá)網(wǎng)關(guān)時(shí),網(wǎng)關(guān)需要根據(jù)URL找到所有可用的實(shí)例,這時(shí)就需要服務(wù)注冊(cè)和發(fā)現(xiàn)功能,即注冊(cè)中心。

        現(xiàn)在流行的注冊(cè)中心有Apache的Zookeeper和阿里的Nacos兩種(consul有點(diǎn)小眾),因?yàn)橹皩?xiě)RPC框架時(shí)已經(jīng)用過(guò)了Zookeeper,所以這次就選擇了Nacos。

        2.2需求清單

        首先要明確目標(biāo),即開(kāi)發(fā)一個(gè)具備哪些特性的網(wǎng)關(guān),總結(jié)下后如下:

        • 自定義路由規(guī)則

          可基于version的路由規(guī)則設(shè)置,路由對(duì)象包括DEFAUL,HEADER和QUERY三種,匹配方式包括=、regex、like三種。

        • 跨語(yǔ)言

          HTTP協(xié)議天生跨語(yǔ)言

        • 高性能

          Netty本身就是一款高性能的通信框架,同時(shí)server將一些路由規(guī)則等數(shù)據(jù)緩存到JVM內(nèi)存避免請(qǐng)求admin服務(wù)。

        • 高可用

          支持集群模式防止單節(jié)點(diǎn)故障,無(wú)狀態(tài)。

        • 灰度發(fā)布

          灰度發(fā)布(又名金絲雀發(fā)布)是指在黑與白之間,能夠平滑過(guò)渡的一種發(fā)布方式。在其上可以進(jìn)行A/B testing,即讓一部分用戶繼續(xù)用產(chǎn)品特性A,一部分用戶開(kāi)始用產(chǎn)品特性B,如果用戶對(duì)B沒(méi)有什么反對(duì)意見(jiàn),那么逐步擴(kuò)大范圍,把所有用戶都遷移到B上面來(lái)。通過(guò)特性一可以實(shí)現(xiàn)。

        • 接口鑒權(quán)

          基于責(zé)任鏈模式,用戶開(kāi)發(fā)自己的鑒權(quán)插件即可。

        • 負(fù)載均衡

          支持多種負(fù)載均衡算法,如隨機(jī),輪詢,加權(quán)輪詢等。利用SPI機(jī)制可以根據(jù)配置進(jìn)行動(dòng)態(tài)加載。

        2.3架構(gòu)設(shè)計(jì)

        在參考了一些優(yōu)秀的網(wǎng)關(guān)Zuul,Spring Cloud Gateway,Soul后,將項(xiàng)目劃分為以下幾個(gè)模塊。

        名稱(chēng)描述
        ship-admin后臺(tái)管理界面,配置路由規(guī)則等
        ship-server網(wǎng)關(guān)服務(wù)端,核心功能模塊
        ship-client-spring-boot-starter網(wǎng)關(guān)客戶端,自動(dòng)注冊(cè)服務(wù)信息到注冊(cè)中心
        ship-common一些公共的代碼,如pojo,常量等。

        它們之間的關(guān)系如圖:

        網(wǎng)關(guān)設(shè)計(jì)

        注意: 這張圖與實(shí)際實(shí)現(xiàn)有點(diǎn)出入,Nacos push到本地緩存的那個(gè)環(huán)節(jié)沒(méi)有實(shí)現(xiàn),目前只有ship-sever定時(shí)輪詢pull的過(guò)程。ship-admin從Nacos獲取注冊(cè)服務(wù)信息的過(guò)程,也改成了ServiceA啟動(dòng)時(shí)主動(dòng)發(fā)生HTTP請(qǐng)求通知ship-admin。

        2.4表結(jié)構(gòu)設(shè)計(jì)

        圖片

        三、編碼

        3.1 ship-client-spring-boot-starter

        首先創(chuàng)建一個(gè)spring-boot-starter命名為ship-client-spring-boot-starter,不知道如何自定義starter的可以看我以前寫(xiě)的《開(kāi)發(fā)自己的starter》。

        其核心類(lèi) AutoRegisterListener 就是在項(xiàng)目啟動(dòng)時(shí)做了兩件事:

        1.將服務(wù)信息注冊(cè)到Nacos注冊(cè)中心

        2.通知ship-admin服務(wù)上線了并注冊(cè)下線hook。

        代碼如下:

        /**
        ?*?Created?by?2YSP?on?2020/12/21
        ?*/

        public?class?AutoRegisterListener?implements?ApplicationListener<ContextRefreshedEvent>?{

        ????private?final?static?Logger?LOGGER?=?LoggerFactory.getLogger(AutoRegisterListener.class);

        ????private?volatile?AtomicBoolean?registered?=?new?AtomicBoolean(false);

        ????private?final?ClientConfigProperties?properties;

        ????@NacosInjected
        ????private?NamingService?namingService;

        ????@Autowired
        ????private?RequestMappingHandlerMapping?handlerMapping;

        ????private?final?ExecutorService?pool;

        ????/**
        ?????*?url?list?to?ignore
        ?????*/

        ????private?static?List?ignoreUrlList?=?new?LinkedList<>();

        ????static?{
        ????????ignoreUrlList.add("/error");
        ????}

        ????public?AutoRegisterListener(ClientConfigProperties?properties)?{
        ????????if?(!check(properties))?{
        ????????????LOGGER.error("client?config?port,contextPath,appName?adminUrl?and?version?can't?be?empty!");
        ????????????throw?new?ShipException("client?config?port,contextPath,appName?adminUrl?and?version?can't?be?empty!");
        ????????}
        ????????this.properties?=?properties;
        ????????pool?=?new?ThreadPoolExecutor(1,?4,?0,?TimeUnit.SECONDS,?new?LinkedBlockingQueue<>());
        ????}

        ????/**
        ?????*?check?the?ClientConfigProperties
        ?????*
        ?????*?@param?properties
        ?????*?@return
        ?????*/

        ????private?boolean?check(ClientConfigProperties?properties)?{
        ????????if?(properties.getPort()?==?null?||?properties.getContextPath()?==?null
        ????????????????||?properties.getVersion()?==?null?||?properties.getAppName()?==?null
        ????????????????||?properties.getAdminUrl()?==?null)?{
        ????????????return?false;
        ????????}
        ????????return?true;
        ????}


        ????@Override
        ????public?void?onApplicationEvent(ContextRefreshedEvent?event)?{
        ????????if?(!registered.compareAndSet(false,?true))?{
        ????????????return;
        ????????}
        ????????doRegister();
        ????????registerShutDownHook();
        ????}

        ????/**
        ?????*?send?unregister?request?to?admin?when?jvm?shutdown
        ?????*/

        ????private?void?registerShutDownHook()?{
        ????????final?String?url?=?"http://"?+?properties.getAdminUrl()?+?AdminConstants.UNREGISTER_PATH;
        ????????final?UnregisterAppDTO?unregisterAppDTO?=?new?UnregisterAppDTO();
        ????????unregisterAppDTO.setAppName(properties.getAppName());
        ????????unregisterAppDTO.setVersion(properties.getVersion());
        ????????unregisterAppDTO.setIp(IpUtil.getLocalIpAddress());
        ????????unregisterAppDTO.setPort(properties.getPort());
        ????????Runtime.getRuntime().addShutdownHook(new?Thread(()?->?{
        ????????????OkhttpTool.doPost(url,?unregisterAppDTO);
        ????????????LOGGER.info("[{}:{}]?unregister?from?ship-admin?success!",?unregisterAppDTO.getAppName(),?unregisterAppDTO.getVersion());
        ????????}));
        ????}

        ????/**
        ?????*?register?all?interface?info?to?register?center
        ?????*/

        ????private?void?doRegister()?{
        ????????Instance?instance?=?new?Instance();
        ????????instance.setIp(IpUtil.getLocalIpAddress());
        ????????instance.setPort(properties.getPort());
        ????????instance.setEphemeral(true);
        ????????Map?metadataMap?=?new?HashMap<>();
        ????????metadataMap.put("version",?properties.getVersion());
        ????????metadataMap.put("appName",?properties.getAppName());
        ????????instance.setMetadata(metadataMap);
        ????????try?{
        ????????????namingService.registerInstance(properties.getAppName(),?NacosConstants.APP_GROUP_NAME,?instance);
        ????????}?catch?(NacosException?e)?{
        ????????????LOGGER.error("register?to?nacos?fail",?e);
        ????????????throw?new?ShipException(e.getErrCode(),?e.getErrMsg());
        ????????}
        ????????LOGGER.info("register?interface?info?to?nacos?success!");
        ????????//?send?register?request?to?ship-admin
        ????????String?url?=?"http://"?+?properties.getAdminUrl()?+?AdminConstants.REGISTER_PATH;
        ????????RegisterAppDTO?registerAppDTO?=?buildRegisterAppDTO(instance);
        ????????OkhttpTool.doPost(url,?registerAppDTO);
        ????????LOGGER.info("register?to?ship-admin?success!");
        ????}


        ????private?RegisterAppDTO?buildRegisterAppDTO(Instance?instance)?{
        ????????RegisterAppDTO?registerAppDTO?=?new?RegisterAppDTO();
        ????????registerAppDTO.setAppName(properties.getAppName());
        ????????registerAppDTO.setContextPath(properties.getContextPath());
        ????????registerAppDTO.setIp(instance.getIp());
        ????????registerAppDTO.setPort(instance.getPort());
        ????????registerAppDTO.setVersion(properties.getVersion());
        ????????return?registerAppDTO;
        ????}
        }

        3.2 ship-server

        ship-sever項(xiàng)目主要包括了兩個(gè)部分內(nèi)容, 1.請(qǐng)求動(dòng)態(tài)路由的主流程 2.本地緩存數(shù)據(jù)和ship-admin及nacos同步,這部分在后面3.3再講。

        ship-server實(shí)現(xiàn)動(dòng)態(tài)路由的原理是利用WebFilter攔截請(qǐng)求,然后將請(qǐng)求教給plugin chain去鏈?zhǔn)教幚怼?/p>

        PluginFilter根據(jù)URL解析出appName,然后將啟用的plugin組裝成plugin chain。

        public?class?PluginFilter?implements?WebFilter?{

        ????private?ServerConfigProperties?properties;

        ????public?PluginFilter(ServerConfigProperties?properties)?{
        ????????this.properties?=?properties;
        ????}

        ????@Override
        ????public?Mono?filter(ServerWebExchange?exchange,?WebFilterChain?chain)?{
        ????????String?appName?=?parseAppName(exchange);
        ????????if?(CollectionUtils.isEmpty(ServiceCache.getAllInstances(appName)))?{
        ????????????throw?new?ShipException(ShipExceptionEnum.SERVICE_NOT_FIND);
        ????????}
        ????????PluginChain?pluginChain?=?new?PluginChain(properties,?appName);
        ????????pluginChain.addPlugin(new?DynamicRoutePlugin(properties));
        ????????pluginChain.addPlugin(new?AuthPlugin(properties));
        ????????return?pluginChain.execute(exchange,?pluginChain);
        ????}

        ????private?String?parseAppName(ServerWebExchange?exchange)?{
        ????????RequestPath?path?=?exchange.getRequest().getPath();
        ????????String?appName?=?path.value().split("/")[1];
        ????????return?appName;
        ????}
        }

        PluginChain繼承了AbstractShipPlugin并持有所有要執(zhí)行的插件。

        /**
        ?*?@Author:?Ship
        ?*?@Description:
        ?*?@Date:?Created?in?2020/12/25
        ?*/

        public?class?PluginChain?extends?AbstractShipPlugin?{
        ????/**
        ?????*?the?pos?point?to?current?plugin
        ?????*/

        ????private?int?pos;
        ????/**
        ?????*?the?plugins?of?chain
        ?????*/

        ????private?List?plugins;

        ????private?final?String?appName;

        ????public?PluginChain(ServerConfigProperties?properties,?String?appName)?{
        ????????super(properties);
        ????????this.appName?=?appName;
        ????}

        ????/**
        ?????*?add?enabled?plugin?to?chain
        ?????*
        ?????*?@param?shipPlugin
        ?????*/

        ????public?void?addPlugin(ShipPlugin?shipPlugin)?{
        ????????if?(plugins?==?null)?{
        ????????????plugins?=?new?ArrayList<>();
        ????????}
        ????????if?(!PluginCache.isEnabled(appName,?shipPlugin.name()))?{
        ????????????return;
        ????????}
        ????????plugins.add(shipPlugin);
        ????????//?order?by?the?plugin's?order
        ????????plugins.sort(Comparator.comparing(ShipPlugin::order));
        ????}

        ????@Override
        ????public?Integer?order()?{
        ????????return?null;
        ????}

        ????@Override
        ????public?String?name()?{
        ????????return?null;
        ????}

        ????@Override
        ????public?Mono?execute(ServerWebExchange?exchange,?PluginChain?pluginChain)?{
        ????????if?(pos?==?plugins.size())?{
        ????????????return?exchange.getResponse().setComplete();
        ????????}
        ????????return?pluginChain.plugins.get(pos++).execute(exchange,?pluginChain);
        ????}

        ????public?String?getAppName()?{
        ????????return?appName;
        ????}

        }

        AbstractShipPlugin實(shí)現(xiàn)了ShipPlugin接口,并持有ServerConfigProperties配置對(duì)象。

        public?abstract?class?AbstractShipPlugin?implements?ShipPlugin?{

        ????protected?ServerConfigProperties?properties;

        ????public?AbstractShipPlugin(ServerConfigProperties?properties)?{
        ????????this.properties?=?properties;
        ????}
        }

        ShipPlugin接口定義了所有插件必須實(shí)現(xiàn)的三個(gè)方法order(),name()和execute()。

        public?interface?ShipPlugin?{
        ????/**
        ?????*?lower?values?have?higher?priority
        ?????*
        ?????*?@return
        ?????*/

        ????Integer?order();

        ????/**
        ?????*?return?current?plugin?name
        ?????*
        ?????*?@return
        ?????*/

        ????String?name();

        ????Mono?execute(ServerWebExchange?exchange,PluginChain?pluginChain);

        }

        DynamicRoutePlugin繼承了抽象類(lèi)AbstractShipPlugin,包含了動(dòng)態(tài)路由的主要業(yè)務(wù)邏輯。

        /**
        ?*?@Author:?Ship
        ?*?@Description:
        ?*?@Date:?Created?in?2020/12/25
        ?*/

        public?class?DynamicRoutePlugin?extends?AbstractShipPlugin?{

        ????private?final?static?Logger?LOGGER?=?LoggerFactory.getLogger(DynamicRoutePlugin.class);

        ????private?static?WebClient?webClient;

        ????private?static?final?Gson?gson?=?new?GsonBuilder().create();

        ????static?{
        ????????HttpClient?httpClient?=?HttpClient.create()
        ????????????????.tcpConfiguration(client?->
        ????????????????????????client.doOnConnected(conn?->
        ????????????????????????????????conn.addHandlerLast(new?ReadTimeoutHandler(3))
        ????????????????????????????????????????.addHandlerLast(new?WriteTimeoutHandler(3)))
        ????????????????????????????????.option(ChannelOption.TCP_NODELAY,?true)
        ????????????????);
        ????????webClient?=?WebClient.builder().clientConnector(new?ReactorClientHttpConnector(httpClient))
        ????????????????.build();
        ????}

        ????public?DynamicRoutePlugin(ServerConfigProperties?properties)?{
        ????????super(properties);
        ????}

        ????@Override
        ????public?Integer?order()?{
        ????????return?ShipPluginEnum.DYNAMIC_ROUTE.getOrder();
        ????}

        ????@Override
        ????public?String?name()?{
        ????????return?ShipPluginEnum.DYNAMIC_ROUTE.getName();
        ????}

        ????@Override
        ????public?Mono?execute(ServerWebExchange?exchange,?PluginChain?pluginChain)?{
        ????????String?appName?=?pluginChain.getAppName();
        ????????ServiceInstance?serviceInstance?=?chooseInstance(appName,?exchange.getRequest());
        //????????LOGGER.info("selected?instance?is?[{}]",?gson.toJson(serviceInstance));
        ????????//?request?service
        ????????String?url?=?buildUrl(exchange,?serviceInstance);
        ????????return?forward(exchange,?url);
        ????}

        ????/**
        ?????*?forward?request?to?backend?service
        ?????*
        ?????*?@param?exchange
        ?????*?@param?url
        ?????*?@return
        ?????*/

        ????private?Mono?forward(ServerWebExchange?exchange,?String?url)?{
        ????????ServerHttpRequest?request?=?exchange.getRequest();
        ????????ServerHttpResponse?response?=?exchange.getResponse();
        ????????HttpMethod?method?=?request.getMethod();

        ????????WebClient.RequestBodySpec?requestBodySpec?=?webClient.method(method).uri(url).headers((headers)?->?{
        ????????????headers.addAll(request.getHeaders());
        ????????});

        ????????WebClient.RequestHeadersSpec?reqHeadersSpec;
        ????????if?(requireHttpBody(method))?{
        ????????????reqHeadersSpec?=?requestBodySpec.body(BodyInserters.fromDataBuffers(request.getBody()));
        ????????}?else?{
        ????????????reqHeadersSpec?=?requestBodySpec;
        ????????}
        ????????//?nio->callback->nio
        ????????return?reqHeadersSpec.exchange().timeout(Duration.ofMillis(properties.getTimeOutMillis()))
        ????????????????.onErrorResume(ex?->?{
        ????????????????????return?Mono.defer(()?->?{
        ????????????????????????String?errorResultJson?=?"";
        ????????????????????????if?(ex?instanceof?TimeoutException)?{
        ????????????????????????????errorResultJson?=?"{\"code\":5001,\"message\":\"network?timeout\"}";
        ????????????????????????}?else?{
        ????????????????????????????errorResultJson?=?"{\"code\":5000,\"message\":\"system?error\"}";
        ????????????????????????}
        ????????????????????????return?ShipResponseUtil.doResponse(exchange,?errorResultJson);
        ????????????????????}).then(Mono.empty());
        ????????????????}).flatMap(backendResponse?->?{
        ????????????????????response.setStatusCode(backendResponse.statusCode());
        ????????????????????response.getHeaders().putAll(backendResponse.headers().asHttpHeaders());
        ????????????????????return?response.writeWith(backendResponse.bodyToFlux(DataBuffer.class));
        ????????????????});
        ????}

        ????/**
        ?????*?weather?the?http?method?need?http?body
        ?????*
        ?????*?@param?method
        ?????*?@return
        ?????*/

        ????private?boolean?requireHttpBody(HttpMethod?method)?{
        ????????if?(method.equals(HttpMethod.POST)?||?method.equals(HttpMethod.PUT)?||?method.equals(HttpMethod.PATCH))?{
        ????????????return?true;
        ????????}
        ????????return?false;
        ????}

        ????private?String?buildUrl(ServerWebExchange?exchange,?ServiceInstance?serviceInstance)?{
        ????????ServerHttpRequest?request?=?exchange.getRequest();
        ????????String?query?=?request.getURI().getQuery();
        ????????String?path?=?request.getPath().value().replaceFirst("/"?+?serviceInstance.getAppName(),?"");
        ????????String?url?=?"http://"?+?serviceInstance.getIp()?+?":"?+?serviceInstance.getPort()?+?path;
        ????????if?(!StringUtils.isEmpty(query))?{
        ????????????url?=?url?+?"?"?+?query;
        ????????}
        ????????return?url;
        ????}


        ????/**
        ?????*?choose?an?ServiceInstance?according?to?route?rule?config?and?load?balancing?algorithm
        ?????*
        ?????*?@param?appName
        ?????*?@param?request
        ?????*?@return
        ?????*/

        ????private?ServiceInstance?chooseInstance(String?appName,?ServerHttpRequest?request)?{
        ????????List?serviceInstances?=?ServiceCache.getAllInstances(appName);
        ????????if?(CollectionUtils.isEmpty(serviceInstances))?{
        ????????????LOGGER.error("service?instance?of?{}?not?find",?appName);
        ????????????throw?new?ShipException(ShipExceptionEnum.SERVICE_NOT_FIND);
        ????????}
        ????????String?version?=?matchAppVersion(appName,?request);
        ????????if?(StringUtils.isEmpty(version))?{
        ????????????throw?new?ShipException("match?app?version?error");
        ????????}
        ????????//?filter?serviceInstances?by?version
        ????????List?instances?=?serviceInstances.stream().filter(i?->?i.getVersion().equals(version)).collect(Collectors.toList());
        ????????//Select?an?instance?based?on?the?load?balancing?algorithm
        ????????LoadBalance?loadBalance?=?LoadBalanceFactory.getInstance(properties.getLoadBalance(),?appName,?version);
        ????????ServiceInstance?serviceInstance?=?loadBalance.chooseOne(instances);
        ????????return?serviceInstance;
        ????}


        ????private?String?matchAppVersion(String?appName,?ServerHttpRequest?request)?{
        ????????List?rules?=?RouteRuleCache.getRules(appName);
        ????????rules.sort(Comparator.comparing(AppRuleDTO::getPriority).reversed());
        ????????for?(AppRuleDTO?rule?:?rules)?{
        ????????????if?(match(rule,?request))?{
        ????????????????return?rule.getVersion();
        ????????????}
        ????????}
        ????????return?null;
        ????}


        ????private?boolean?match(AppRuleDTO?rule,?ServerHttpRequest?request)?{
        ????????String?matchObject?=?rule.getMatchObject();
        ????????String?matchKey?=?rule.getMatchKey();
        ????????String?matchRule?=?rule.getMatchRule();
        ????????Byte?matchMethod?=?rule.getMatchMethod();
        ????????if?(MatchObjectEnum.DEFAULT.getCode().equals(matchObject))?{
        ????????????return?true;
        ????????}?else?if?(MatchObjectEnum.QUERY.getCode().equals(matchObject))?{
        ????????????String?param?=?request.getQueryParams().getFirst(matchKey);
        ????????????if?(!StringUtils.isEmpty(param))?{
        ????????????????return?StringTools.match(param,?matchMethod,?matchRule);
        ????????????}
        ????????}?else?if?(MatchObjectEnum.HEADER.getCode().equals(matchObject))?{
        ????????????HttpHeaders?headers?=?request.getHeaders();
        ????????????String?headerValue?=?headers.getFirst(matchKey);
        ????????????if?(!StringUtils.isEmpty(headerValue))?{
        ????????????????return?StringTools.match(headerValue,?matchMethod,?matchRule);
        ????????????}
        ????????}
        ????????return?false;
        ????}

        }

        3.3 數(shù)據(jù)同步

        app數(shù)據(jù)同步

        后臺(tái)服務(wù)(如訂單服務(wù))啟動(dòng)時(shí),只將服務(wù)名,版本,ip地址和端口號(hào)注冊(cè)到了Nacos,并沒(méi)有實(shí)例的權(quán)重和啟用的插件信息怎么辦?

        一般在線的實(shí)例權(quán)重和插件列表都是在管理界面配置,然后動(dòng)態(tài)生效的,所以需要ship-admin定時(shí)更新實(shí)例的權(quán)重和插件信息到注冊(cè)中心。

        對(duì)應(yīng)代碼ship-admin的NacosSyncListener

        /**
        ?*?@Author:?Ship
        ?*?@Description:
        ?*?@Date:?Created?in?2020/12/30
        ?*/

        @Configuration
        public?class?NacosSyncListener?implements?ApplicationListener<ContextRefreshedEvent>?{

        ????private?static?final?Logger?LOGGER?=?LoggerFactory.getLogger(NacosSyncListener.class);

        ????private?static?ScheduledThreadPoolExecutor?scheduledPool?=?new?ScheduledThreadPoolExecutor(1,
        ????????????new?ShipThreadFactory("nacos-sync",?true).create());

        ????@NacosInjected
        ????private?NamingService?namingService;

        ????@Value("${nacos.discovery.server-addr}")
        ????private?String?baseUrl;

        ????@Resource
        ????private?AppService?appService;

        ????@Override
        ????public?void?onApplicationEvent(ContextRefreshedEvent?event)?{
        ????????if?(event.getApplicationContext().getParent()?!=?null)?{
        ????????????return;
        ????????}
        ????????String?url?=?"http://"?+?baseUrl?+?NacosConstants.INSTANCE_UPDATE_PATH;
        ????????scheduledPool.scheduleWithFixedDelay(new?NacosSyncTask(namingService,?url,?appService),?0,?30L,?TimeUnit.SECONDS);
        ????}

        ????class?NacosSyncTask?implements?Runnable?{

        ????????private?NamingService?namingService;

        ????????private?String?url;

        ????????private?AppService?appService;

        ????????private?Gson?gson?=?new?GsonBuilder().create();

        ????????public?NacosSyncTask(NamingService?namingService,?String?url,?AppService?appService)?{
        ????????????this.namingService?=?namingService;
        ????????????this.url?=?url;
        ????????????this.appService?=?appService;
        ????????}

        ????????/**
        ?????????*?Regular?update?weight,enabled?plugins?to?nacos?instance
        ?????????*/

        ????????@Override
        ????????public?void?run()?{
        ????????????try?{
        ????????????????//?get?all?app?names
        ????????????????ListView?services?=?namingService.getServicesOfServer(1,?Integer.MAX_VALUE,?NacosConstants.APP_GROUP_NAME);
        ????????????????if?(CollectionUtils.isEmpty(services.getData()))?{
        ????????????????????return;
        ????????????????}
        ????????????????List?appNames?=?services.getData();
        ????????????????List?appInfos?=?appService.getAppInfos(appNames);
        ????????????????for?(AppInfoDTO?appInfo?:?appInfos)?{
        ????????????????????if?(CollectionUtils.isEmpty(appInfo.getInstances()))?{
        ????????????????????????continue;
        ????????????????????}
        ????????????????????for?(ServiceInstance?instance?:?appInfo.getInstances())?{
        ????????????????????????Map?queryMap?=?buildQueryMap(appInfo,?instance);
        ????????????????????????String?resp?=?OkhttpTool.doPut(url,?queryMap,?"");
        ????????????????????????LOGGER.debug("response?:{}",?resp);
        ????????????????????}
        ????????????????}

        ????????????}?catch?(Exception?e)?{
        ????????????????LOGGER.error("nacos?sync?task?error",?e);
        ????????????}
        ????????}

        ????????private?Map?buildQueryMap(AppInfoDTO?appInfo,?ServiceInstance?instance)?{
        ????????????Map?map?=?new?HashMap<>();
        ????????????map.put("serviceName",?appInfo.getAppName());
        ????????????map.put("groupName",?NacosConstants.APP_GROUP_NAME);
        ????????????map.put("ip",?instance.getIp());
        ????????????map.put("port",?instance.getPort());
        ????????????map.put("weight",?instance.getWeight().doubleValue());
        ????????????NacosMetadata?metadata?=?new?NacosMetadata();
        ????????????metadata.setAppName(appInfo.getAppName());
        ????????????metadata.setVersion(instance.getVersion());
        ????????????metadata.setPlugins(String.join(",",?appInfo.getEnabledPlugins()));
        ????????????map.put("metadata",?StringTools.urlEncode(gson.toJson(metadata)));
        ????????????map.put("ephemeral",?true);
        ????????????return?map;
        ????????}
        ????}
        }

        ship-server再定時(shí)從Nacos拉取app數(shù)據(jù)更新到本地Map緩存。

        /**
        ?*?@Author:?Ship
        ?*?@Description:?sync?data?to?local?cache
        ?*?@Date:?Created?in?2020/12/25
        ?*/

        @Configuration
        public?class?DataSyncTaskListener?implements?ApplicationListener<ContextRefreshedEvent>?{

        ????private?static?ScheduledThreadPoolExecutor?scheduledPool?=?new?ScheduledThreadPoolExecutor(1,
        ????????????new?ShipThreadFactory("service-sync",?true).create());

        ????@NacosInjected
        ????private?NamingService?namingService;

        ????@Autowired
        ????private?ServerConfigProperties?properties;

        ????@Override
        ????public?void?onApplicationEvent(ContextRefreshedEvent?event)?{
        ????????if?(event.getApplicationContext().getParent()?!=?null)?{
        ????????????return;
        ????????}
        ????????scheduledPool.scheduleWithFixedDelay(new?DataSyncTask(namingService)
        ????????????????,?0L,?properties.getCacheRefreshInterval(),?TimeUnit.SECONDS);
        ????????WebsocketSyncCacheServer?websocketSyncCacheServer?=?new?WebsocketSyncCacheServer(properties.getWebSocketPort());
        ????????websocketSyncCacheServer.start();
        ????}


        ????class?DataSyncTask?implements?Runnable?{

        ????????private?NamingService?namingService;

        ????????public?DataSyncTask(NamingService?namingService)?{
        ????????????this.namingService?=?namingService;
        ????????}

        ????????@Override
        ????????public?void?run()?{
        ????????????try?{
        ????????????????//?get?all?app?names
        ????????????????ListView?services?=?namingService.getServicesOfServer(1,?Integer.MAX_VALUE,?NacosConstants.APP_GROUP_NAME);
        ????????????????if?(CollectionUtils.isEmpty(services.getData()))?{
        ????????????????????return;
        ????????????????}
        ????????????????List?appNames?=?services.getData();
        ????????????????//?get?all?instances
        ????????????????for?(String?appName?:?appNames)?{
        ????????????????????List?instanceList?=?namingService.getAllInstances(appName,?NacosConstants.APP_GROUP_NAME);
        ????????????????????if?(CollectionUtils.isEmpty(instanceList))?{
        ????????????????????????continue;
        ????????????????????}
        ????????????????????ServiceCache.add(appName,?buildServiceInstances(instanceList));
        ????????????????????List?pluginNames?=?getEnabledPlugins(instanceList);
        ????????????????????PluginCache.add(appName,?pluginNames);
        ????????????????}
        ????????????????ServiceCache.removeExpired(appNames);
        ????????????????PluginCache.removeExpired(appNames);

        ????????????}?catch?(NacosException?e)?{
        ????????????????e.printStackTrace();
        ????????????}
        ????????}

        ????????private?List?getEnabledPlugins(List?instanceList)?{
        ????????????Instance?instance?=?instanceList.get(0);
        ????????????Map?metadata?=?instance.getMetadata();
        ????????????//?plugins:?DynamicRoute,Auth
        ????????????String?plugins?=?metadata.getOrDefault("plugins",?ShipPluginEnum.DYNAMIC_ROUTE.getName());
        ????????????return?Arrays.stream(plugins.split(",")).collect(Collectors.toList());
        ????????}

        ????????private?List?buildServiceInstances(List?instanceList)?{
        ????????????List?list?=?new?LinkedList<>();
        ????????????instanceList.forEach(instance?->?{
        ????????????????Map?metadata?=?instance.getMetadata();
        ????????????????ServiceInstance?serviceInstance?=?new?ServiceInstance();
        ????????????????serviceInstance.setAppName(metadata.get("appName"));
        ????????????????serviceInstance.setIp(instance.getIp());
        ????????????????serviceInstance.setPort(instance.getPort());
        ????????????????serviceInstance.setVersion(metadata.get("version"));
        ????????????????serviceInstance.setWeight((int)?instance.getWeight());
        ????????????????list.add(serviceInstance);
        ????????????});
        ????????????return?list;
        ????????}
        ????}
        }

        路由規(guī)則數(shù)據(jù)同步

        同時(shí),如果用戶在管理后臺(tái)更新了路由規(guī)則,ship-admin需要推送規(guī)則數(shù)據(jù)到ship-server,這里參考了soul網(wǎng)關(guān)的做法利用websocket在第一次建立連接后進(jìn)行全量同步,此后路由規(guī)則發(fā)生變更就只作增量同步。

        服務(wù)端WebsocketSyncCacheServer:

        /**
        ?*?@Author:?Ship
        ?*?@Description:
        ?*?@Date:?Created?in?2020/12/28
        ?*/

        public?class?WebsocketSyncCacheServer?extends?WebSocketServer?{

        ????private?final?static?Logger?LOGGER?=?LoggerFactory.getLogger(WebsocketSyncCacheServer.class);

        ????private?Gson?gson?=?new?GsonBuilder().create();

        ????private?MessageHandler?messageHandler;

        ????public?WebsocketSyncCacheServer(Integer?port)?{
        ????????super(new?InetSocketAddress(port));
        ????????this.messageHandler?=?new?MessageHandler();
        ????}


        ????@Override
        ????public?void?onOpen(WebSocket?webSocket,?ClientHandshake?clientHandshake)?{
        ????????LOGGER.info("server?is?open");
        ????}

        ????@Override
        ????public?void?onClose(WebSocket?webSocket,?int?i,?String?s,?boolean?b)?{
        ????????LOGGER.info("websocket?server?close...");
        ????}

        ????@Override
        ????public?void?onMessage(WebSocket?webSocket,?String?message)?{
        ????????LOGGER.info("websocket?server?receive?message:\n[{}]",?message);
        ????????this.messageHandler.handler(message);
        ????}

        ????@Override
        ????public?void?onError(WebSocket?webSocket,?Exception?e)?{

        ????}

        ????@Override
        ????public?void?onStart()?{
        ????????LOGGER.info("websocket?server?start...");
        ????}


        ????class?MessageHandler?{

        ????????public?void?handler(String?message)?{
        ????????????RouteRuleOperationDTO?operationDTO?=?gson.fromJson(message,?RouteRuleOperationDTO.class);
        ????????????if?(CollectionUtils.isEmpty(operationDTO.getRuleList()))?{
        ????????????????return;
        ????????????}
        ????????????Map>?map?=?operationDTO.getRuleList()
        ????????????????????.stream().collect(Collectors.groupingBy(AppRuleDTO::getAppName));
        ????????????if?(OperationTypeEnum.INSERT.getCode().equals(operationDTO.getOperationType())
        ????????????????????||?OperationTypeEnum.UPDATE.getCode().equals(operationDTO.getOperationType()))?{
        ????????????????RouteRuleCache.add(map);
        ????????????}?else?if?(OperationTypeEnum.DELETE.getCode().equals(operationDTO.getOperationType()))?{
        ????????????????RouteRuleCache.remove(map);
        ????????????}
        ????????}
        ????}
        }

        客戶端WebsocketSyncCacheClient:

        /**
        ?*?@Author:?Ship
        ?*?@Description:
        ?*?@Date:?Created?in?2020/12/28
        ?*/

        @Component
        public?class?WebsocketSyncCacheClient?{

        ????private?final?static?Logger?LOGGER?=?LoggerFactory.getLogger(WebsocketSyncCacheClient.class);

        ????private?WebSocketClient?client;

        ????private?RuleService?ruleService;

        ????private?Gson?gson?=?new?GsonBuilder().create();

        ????public?WebsocketSyncCacheClient(@Value("${ship.server-web-socket-url}")?String?serverWebSocketUrl,
        ????????????????????????????????????RuleService?ruleService)?
        {
        ????????if?(StringUtils.isEmpty(serverWebSocketUrl))?{
        ????????????throw?new?ShipException(ShipExceptionEnum.CONFIG_ERROR);
        ????????}
        ????????this.ruleService?=?ruleService;
        ????????ScheduledThreadPoolExecutor?executor?=?new?ScheduledThreadPoolExecutor(1,
        ????????????????new?ShipThreadFactory("websocket-connect",?true).create());
        ????????try?{
        ????????????client?=?new?WebSocketClient(new?URI(serverWebSocketUrl))?{
        ????????????????@Override
        ????????????????public?void?onOpen(ServerHandshake?serverHandshake)?{
        ????????????????????LOGGER.info("client?is?open");
        ????????????????????List?list?=?ruleService.getEnabledRule();
        ????????????????????String?msg?=?gson.toJson(new?RouteRuleOperationDTO(OperationTypeEnum.INSERT,?list));
        ????????????????????send(msg);
        ????????????????}

        ????????????????@Override
        ????????????????public?void?onMessage(String?s)?{
        ????????????????}

        ????????????????@Override
        ????????????????public?void?onClose(int?i,?String?s,?boolean?b)?{
        ????????????????}

        ????????????????@Override
        ????????????????public?void?onError(Exception?e)?{
        ????????????????????LOGGER.error("websocket?client?error",?e);
        ????????????????}
        ????????????};

        ????????????client.connectBlocking();
        ????????????//使用調(diào)度線程池進(jìn)行斷線重連,30秒進(jìn)行一次
        ????????????executor.scheduleAtFixedRate(()?->?{
        ????????????????if?(client?!=?null?&&?client.isClosed())?{
        ????????????????????try?{
        ????????????????????????client.reconnectBlocking();
        ????????????????????}?catch?(InterruptedException?e)?{
        ????????????????????????LOGGER.error("reconnect?server?fail",?e);
        ????????????????????}
        ????????????????}
        ????????????},?10,?30,?TimeUnit.SECONDS);

        ????????}?catch?(Exception?e)?{
        ????????????LOGGER.error("websocket?sync?cache?exception",?e);
        ????????????throw?new?ShipException(e.getMessage());
        ????????}
        ????}

        ????public??void?send(T?t)?{
        ????????while?(!client.getReadyState().equals(ReadyState.OPEN))?{
        ????????????LOGGER.debug("connecting?...please?wait");
        ????????}
        ????????client.send(gson.toJson(t));
        ????}
        }

        四、測(cè)試

        4.1動(dòng)態(tài)路由測(cè)試

        1. 本地啟動(dòng)nacos ,sh startup.sh -m standalone

        2. 啟動(dòng)ship-admin

        3. 本地啟動(dòng)兩個(gè)ship-example實(shí)例。

          實(shí)例1配置:

          ship:
          ??http:
          ????app-name:?order
          ????version:?gray_1.0
          ????context-path:?/order
          ????port:?8081
          ????admin-url:?127.0.0.1:9001

          server:
          ??port:?8081

          nacos:
          ??discovery:
          ????server-addr:?127.0.0.1:8848

          實(shí)例2配置:

          ship:
          ??http:
          ????app-name:?order
          ????version:?prod_1.0
          ????context-path:?/order
          ????port:?8082
          ????admin-url:?127.0.0.1:9001

          server:
          ??port:?8082

          nacos:
          ??discovery:
          ????server-addr:?127.0.0.1:8848
        4. 在數(shù)據(jù)庫(kù)添加路由規(guī)則配置,該規(guī)則表示當(dāng)http header 中的name=ship時(shí)請(qǐng)求路由到gray_1.0版本的節(jié)點(diǎn)。

        圖片
        1. 啟動(dòng)ship-server,看到以下日志時(shí)則可以進(jìn)行測(cè)試了。

          2021-01-02?19:57:09.159??INFO?30413?---?[SocketWorker-29]?cn.sp.sync.WebsocketSyncCacheServer??????:?websocket?server?receive?message:
          [{"operationType":"INSERT","ruleList":[{"id":1,"appId":5,"appName":"order","version":"gray_1.0","matchObject":"HEADER","matchKey":"name","matchMethod":1,"matchRule":"ship","priority":50}]}]
        2. 用Postman請(qǐng)求http://localhost:9000/order/user/add,POST方式,header設(shè)置name=ship,可以看到只有實(shí)例1有日志顯示。

          ==========add?user,version:gray_1.0

        4.2性能壓測(cè)

        壓測(cè)環(huán)境:

        MacBook Pro 13英寸

        處理器 2.3 GHz 四核Intel Core i7

        內(nèi)存 16 GB 3733 MHz LPDDR4X

        后端節(jié)點(diǎn)個(gè)數(shù)一個(gè)

        壓測(cè)工具:wrk

        壓測(cè)結(jié)果:20個(gè)線程,500個(gè)連接數(shù),吞吐量大概每秒9400個(gè)請(qǐng)求。

        壓測(cè)結(jié)果

        五、總結(jié)

        千里之行始于足下,開(kāi)始以為寫(xiě)一個(gè)網(wǎng)關(guān)會(huì)很難,但當(dāng)你實(shí)際開(kāi)始行動(dòng)時(shí)就會(huì)發(fā)現(xiàn)其實(shí)沒(méi)那么難,所以邁出第一步很重要。過(guò)程中也遇到了很多問(wèn)題,還在github上給soul和nacos這兩個(gè)開(kāi)源項(xiàng)目提了兩個(gè)issue,后來(lái)發(fā)現(xiàn)是自己的問(wèn)題,尷尬??。

        END


        有熱門(mén)推薦??

        1.?再見(jiàn),MySQL!性能被 MariaDB 吊打 ?

        2.?淘寶開(kāi)源代碼質(zhì)量檢測(cè)工具!

        3.?多線程場(chǎng)景下使用 ArrayList,這幾點(diǎn)一定要注意!

        4.?MyBatis 的執(zhí)行流程,寫(xiě)得太好了叭!

        最近面試BAT,整理一份面試資料Java面試BATJ通關(guān)手冊(cè),覆蓋了Java核心技術(shù)、JVM、Java并發(fā)、SSM、微服務(wù)、數(shù)據(jù)庫(kù)、數(shù)據(jù)結(jié)構(gòu)等等。

        獲取方式:點(diǎn)“在看”,關(guān)注公眾號(hào)并回復(fù)?Java?領(lǐng)取,更多內(nèi)容陸續(xù)奉上。

        文章有幫助的話,在看,轉(zhuǎn)發(fā)吧。

        謝謝支持喲 (*^__^*)

        瀏覽 46
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            偷偷操av | 欧美亚洲日韩电影 | 天天插天天干天天操 | 巨乳美女被操 | 找个操逼的视频 | 日韩在线视频播放 | 又污又黄又爽的网站 | 日本xxxxxxxxx | 久久69| 操b视频在线免费收看 |