1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        阿里終面:如何設計一個高性能網關?

        共 63603字,需瀏覽 128分鐘

         ·

        2021-03-17 14:16

        點擊關注公眾號,Java干貨及時送達

        作者:煙味i
        鏈接:https://cnblogs.com/2YSP/p/14223892.html

        一、前言

        最近在github上看了soul網關的設計,突然就來了興趣準備自己從零開始寫一個高性能的網關。

        經過兩周時間的開發(fā),我的網關ship-gate核心功能基本都已完成,最大的缺陷就是前端功底太差沒有管理后臺??。

        二、設計

        2.1 技術選型

        網關是所有請求的入口,所以要求有很高的吞吐量,為了實現這點可以使用請求異步化來解決。

        目前一般有以下兩種方案:

        • Tomcat/Jetty+NIO+Servlet3

        Servlet3已經支持異步,這種方案使用比較多,京東,有贊和Zuul,都用的是這種方案。

        • Netty+NIO

        Netty為高并發(fā)而生,目前唯品會的網關使用這個策略,在唯品會的技術文章中在相同的情況下Netty是每秒30w+的吞吐量,Tomcat是13w+,可以看出是有一定的差距的,但是Netty需要自己處理HTTP協(xié)議,這一塊比較麻煩。

        后面發(fā)現Soul網關是基于Spring WebFlux(底層Netty)的,不用太關心HTTP協(xié)議的處理,于是決定也用Spring WebFlux。

        網關的第二個特點是具備可擴展性,比如Netflix Zuul有preFilters,postFilters等在不同的階段方便處理不同的業(yè)務,基于責任鏈模式將請求進行鏈式處理即可實現。

        在微服務架構下,服務都會進行多實例部署來保證高可用,請求到達網關時,網關需要根據URL找到所有可用的實例,這時就需要服務注冊和發(fā)現功能,即注冊中心。

        現在流行的注冊中心有Apache的Zookeeper和阿里的Nacos兩種(consul有點小眾),因為之前寫RPC框架時已經用過了Zookeeper,所以這次就選擇了Nacos。

        2.2 需求清單

        首先要明確目標,即開發(fā)一個具備哪些特性的網關,總結下后如下:

        • 自定義路由規(guī)則

          可基于version的路由規(guī)則設置,路由對象包括DEFAUL,HEADER和QUERY三種,匹配方式包括=、regex、like三種。

        • 跨語言

          HTTP協(xié)議天生跨語言

        • 高性能

          Netty本身就是一款高性能的通信框架,同時server將一些路由規(guī)則等數據緩存到JVM內存避免請求admin服務。

        • 高可用

          支持集群模式防止單節(jié)點故障,無狀態(tài)。

        • 灰度發(fā)布

          灰度發(fā)布(又名金絲雀發(fā)布)是指在黑與白之間,能夠平滑過渡的一種發(fā)布方式。在其上可以進行A/B testing,即讓一部分用戶繼續(xù)用產品特性A,一部分用戶開始用產品特性B,如果用戶對B沒有什么反對意見,那么逐步擴大范圍,把所有用戶都遷移到B上面來。通過特性一可以實現。

        • 接口鑒權

          基于責任鏈模式,用戶開發(fā)自己的鑒權插件即可。

        • 負載均衡

          支持多種負載均衡算法,如隨機,輪詢,加權輪詢等。利用SPI機制可以根據配置進行動態(tài)加載。

        2.3 架構設計

        在參考了一些優(yōu)秀的網關Zuul,Spring Cloud Gateway,Soul后,將項目劃分為以下幾個模塊。

        名稱描述
        ship-admin后臺管理界面,配置路由規(guī)則等
        ship-server網關服務端,核心功能模塊
        ship-client-spring-boot-starter網關客戶端,自動注冊服務信息到注冊中心
        ship-common一些公共的代碼,如pojo,常量等。

        它們之間的關系如圖:

        注意: 這張圖與實際實現有點出入,Nacos push到本地緩存的那個環(huán)節(jié)沒有實現,目前只有ship-sever定時輪詢pull的過程。ship-admin從Nacos獲取注冊服務信息的過程,也改成了ServiceA啟動時主動發(fā)生HTTP請求通知ship-admin。

        2.4 表結構設計

        三、編碼

        3.1 ship-client-spring-boot-starter

        首先創(chuàng)建一個spring-boot-starter命名為ship-client-spring-boot-starter,不知道如何自定義starter的可以看我以前寫的《開發(fā)自己的starter》。

        更多 Spring Boot 教程推薦看這個:

        https://github.com/javastacks/spring-boot-best-practice

        其核心類 AutoRegisterListener 就是在項目啟動時做了兩件事:

        1.將服務信息注冊到Nacos注冊中心

        2.通知ship-admin服務上線了并注冊下線hook。

        代碼如下:

        /**
         * Created by 2YSP on 2020/12/21
         */

        public class AutoRegisterListener implements ApplicationListener<ContextRefreshedEvent{
         
            private final static Logger LOGGER = LoggerFactory.getLogger(AutoRegisterListener.class);
         
            private volatile AtomicBoolean registered = new AtomicBoolean(false);
         
            private final ClientConfigProperties properties;
         
            @NacosInjected
            private NamingService namingService;
         
            @Autowired
            private RequestMappingHandlerMapping handlerMapping;
         
            private final ExecutorService pool;
         
            /**
             * url list to ignore
             */

            private static List<String> ignoreUrlList = new LinkedList<>();
         
            static {
                ignoreUrlList.add("/error");
            }
         
            public AutoRegisterListener(ClientConfigProperties properties) {
                if (!check(properties)) {
                    LOGGER.error("client config port,contextPath,appName adminUrl and version can't be empty!");
                    throw new ShipException("client config port,contextPath,appName adminUrl and version can't be empty!");
                }
                this.properties = properties;
                pool = new ThreadPoolExecutor(140, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
            }
         
            /**
             * check the ClientConfigProperties
             *
             * @param properties
             * @return
             */

            private boolean check(ClientConfigProperties properties) {
                if (properties.getPort() == null || properties.getContextPath() == null
                        || properties.getVersion() == null || properties.getAppName() == null
                        || properties.getAdminUrl() == null) {
                    return false;
                }
                return true;
            }
         
         
            @Override
            public void onApplicationEvent(ContextRefreshedEvent event) {
                if (!registered.compareAndSet(falsetrue)) {
                    return;
                }
                doRegister();
                registerShutDownHook();
            }
         
            /**
             * send unregister request to admin when jvm shutdown
             */

            private void registerShutDownHook() {
                final String url = "http://" + properties.getAdminUrl() + AdminConstants.UNREGISTER_PATH;
                final UnregisterAppDTO unregisterAppDTO = new UnregisterAppDTO();
                unregisterAppDTO.setAppName(properties.getAppName());
                unregisterAppDTO.setVersion(properties.getVersion());
                unregisterAppDTO.setIp(IpUtil.getLocalIpAddress());
                unregisterAppDTO.setPort(properties.getPort());
                Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                    OkhttpTool.doPost(url, unregisterAppDTO);
                    LOGGER.info("[{}:{}] unregister from ship-admin success!", unregisterAppDTO.getAppName(), unregisterAppDTO.getVersion());
                }));
            }
         
            /**
             * register all interface info to register center
             */

            private void doRegister() {
                Instance instance = new Instance();
                instance.setIp(IpUtil.getLocalIpAddress());
                instance.setPort(properties.getPort());
                instance.setEphemeral(true);
                Map<String, String> metadataMap = new HashMap<>();
                metadataMap.put("version", properties.getVersion());
                metadataMap.put("appName", properties.getAppName());
                instance.setMetadata(metadataMap);
                try {
                    namingService.registerInstance(properties.getAppName(), NacosConstants.APP_GROUP_NAME, instance);
                } catch (NacosException e) {
                    LOGGER.error("register to nacos fail", e);
                    throw new ShipException(e.getErrCode(), e.getErrMsg());
                }
                LOGGER.info("register interface info to nacos success!");
                // send register request to ship-admin
                String url = "http://" + properties.getAdminUrl() + AdminConstants.REGISTER_PATH;
                RegisterAppDTO registerAppDTO = buildRegisterAppDTO(instance);
                OkhttpTool.doPost(url, registerAppDTO);
                LOGGER.info("register to ship-admin success!");
            }
         
         
            private RegisterAppDTO buildRegisterAppDTO(Instance instance) {
                RegisterAppDTO registerAppDTO = new RegisterAppDTO();
                registerAppDTO.setAppName(properties.getAppName());
                registerAppDTO.setContextPath(properties.getContextPath());
                registerAppDTO.setIp(instance.getIp());
                registerAppDTO.setPort(instance.getPort());
                registerAppDTO.setVersion(properties.getVersion());
                return registerAppDTO;
            }
        }
         

        3.2 ship-server

        ship-sever項目主要包括了兩個部分內容:

        1.請求動態(tài)路由的主流程

        2.本地緩存數據和ship-admin及nacos同步,這部分在后面3.3再講。

        ship-server實現動態(tài)路由的原理是利用WebFilter攔截請求,然后將請求教給plugin chain去鏈式處理。

        PluginFilter根據URL解析出appName,然后將啟用的plugin組裝成plugin chain。

        最新 Java 核心技術教程,都在這了!

        public class PluginFilter implements WebFilter {
         
            private ServerConfigProperties properties;
         
            public PluginFilter(ServerConfigProperties properties) {
                this.properties = properties;
            }
         
            @Override
            public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
                String appName = parseAppName(exchange);
                if (CollectionUtils.isEmpty(ServiceCache.getAllInstances(appName))) {
                    throw new ShipException(ShipExceptionEnum.SERVICE_NOT_FIND);
                }
                PluginChain pluginChain = new PluginChain(properties, appName);
                pluginChain.addPlugin(new DynamicRoutePlugin(properties));
                pluginChain.addPlugin(new AuthPlugin(properties));
                return pluginChain.execute(exchange, pluginChain);
            }
         
            private String parseAppName(ServerWebExchange exchange) {
                RequestPath path = exchange.getRequest().getPath();
                String appName = path.value().split("/")[1];
                return appName;
            }
        }

        PluginChain繼承了AbstractShipPlugin并持有所有要執(zhí)行的插件。

        /**
         * @Author: Ship
         * @Description:
         * @Date: Created in 2020/12/25
         */

        public class PluginChain extends AbstractShipPlugin {
            /**
             * the pos point to current plugin
             */

            private int pos;
            /**
             * the plugins of chain
             */

            private List<ShipPlugin> plugins;
         
            private final String appName;
         
            public PluginChain(ServerConfigProperties properties, String appName) {
                super(properties);
                this.appName = appName;
            }
         
            /**
             * add enabled plugin to chain
             *
             * @param shipPlugin
             */

            public void addPlugin(ShipPlugin shipPlugin) {
                if (plugins == null) {
                    plugins = new ArrayList<>();
                }
                if (!PluginCache.isEnabled(appName, shipPlugin.name())) {
                    return;
                }
                plugins.add(shipPlugin);
                // order by the plugin's order
                plugins.sort(Comparator.comparing(ShipPlugin::order));
            }
         
            @Override
            public Integer order() {
                return null;
            }
         
            @Override
            public String name() {
                return null;
            }
         
            @Override
            public Mono<Void> execute(ServerWebExchange exchange, PluginChain pluginChain) {
                if (pos == plugins.size()) {
                    return exchange.getResponse().setComplete();
                }
                return pluginChain.plugins.get(pos++).execute(exchange, pluginChain);
            }
         
            public String getAppName() {
                return appName;
            }
         
        }
         

        AbstractShipPlugin實現了ShipPlugin接口,并持有ServerConfigProperties配置對象。

        public abstract class AbstractShipPlugin implements ShipPlugin {
         
            protected ServerConfigProperties properties;
         
            public AbstractShipPlugin(ServerConfigProperties properties) {
                this.properties = properties;
            }
        }

        ShipPlugin接口定義了所有插件必須實現的三個方法order(),name()和execute()。

        public interface ShipPlugin {
            /**
             * lower values have higher priority
             *
             * @return
             */

            Integer order();
         
            /**
             * return current plugin name
             *
             * @return
             */

            String name();
         
            Mono<Void> execute(ServerWebExchange exchange,PluginChain pluginChain);
         
        }

        DynamicRoutePlugin繼承了抽象類AbstractShipPlugin,包含了動態(tài)路由的主要業(yè)務邏輯。

        /**
         * @Author: Ship
         * @Description:
         * @Date: Created in 2020/12/25
         */

        public class DynamicRoutePlugin extends AbstractShipPlugin {
         
            private final static Logger LOGGER = LoggerFactory.getLogger(DynamicRoutePlugin.class);
         
            private static WebClient webClient;
         
            private static final Gson gson = new GsonBuilder().create();
         
            static {
                HttpClient httpClient = HttpClient.create()
                        .tcpConfiguration(client ->
                                client.doOnConnected(conn ->
                                        conn.addHandlerLast(new ReadTimeoutHandler(3))
                                                .addHandlerLast(new WriteTimeoutHandler(3)))
                                        .option(ChannelOption.TCP_NODELAY, true)
                        );
                webClient = WebClient.builder().clientConnector(new ReactorClientHttpConnector(httpClient))
                        .build();
            }
         
            public DynamicRoutePlugin(ServerConfigProperties properties) {
                super(properties);
            }
         
            @Override
            public Integer order() {
                return ShipPluginEnum.DYNAMIC_ROUTE.getOrder();
            }
         
            @Override
            public String name() {
                return ShipPluginEnum.DYNAMIC_ROUTE.getName();
            }
         
            @Override
            public Mono<Void> execute(ServerWebExchange exchange, PluginChain pluginChain) {
                String appName = pluginChain.getAppName();
                ServiceInstance serviceInstance = chooseInstance(appName, exchange.getRequest());
        //        LOGGER.info("selected instance is [{}]", gson.toJson(serviceInstance));
                // request service
                String url = buildUrl(exchange, serviceInstance);
                return forward(exchange, url);
            }
         
            /**
             * forward request to backend service
             *
             * @param exchange
             * @param url
             * @return
             */

            private Mono<Void> forward(ServerWebExchange exchange, String url) {
                ServerHttpRequest request = exchange.getRequest();
                ServerHttpResponse response = exchange.getResponse();
                HttpMethod method = request.getMethod();
         
                WebClient.RequestBodySpec requestBodySpec = webClient.method(method).uri(url).headers((headers) -> {
                    headers.addAll(request.getHeaders());
                });
         
                WebClient.RequestHeadersSpec<?> reqHeadersSpec;
                if (requireHttpBody(method)) {
                    reqHeadersSpec = requestBodySpec.body(BodyInserters.fromDataBuffers(request.getBody()));
                } else {
                    reqHeadersSpec = requestBodySpec;
                }
                // nio->callback->nio
                return reqHeadersSpec.exchange().timeout(Duration.ofMillis(properties.getTimeOutMillis()))
                        .onErrorResume(ex -> {
                            return Mono.defer(() -> {
                                String errorResultJson = "";
                                if (ex instanceof TimeoutException) {
                                    errorResultJson = "{\"code\":5001,\"message\":\"network timeout\"}";
                                } else {
                                    errorResultJson = "{\"code\":5000,\"message\":\"system error\"}";
                                }
                                return ShipResponseUtil.doResponse(exchange, errorResultJson);
                            }).then(Mono.empty());
                        }).flatMap(backendResponse -> {
                            response.setStatusCode(backendResponse.statusCode());
                            response.getHeaders().putAll(backendResponse.headers().asHttpHeaders());
                            return response.writeWith(backendResponse.bodyToFlux(DataBuffer.class));
                        });
            }
         
            /**
             * weather the http method need http body
             *
             * @param method
             * @return
             */

            private boolean requireHttpBody(HttpMethod method) {
                if (method.equals(HttpMethod.POST) || method.equals(HttpMethod.PUT) || method.equals(HttpMethod.PATCH)) {
                    return true;
                }
                return false;
            }
         
            private String buildUrl(ServerWebExchange exchange, ServiceInstance serviceInstance) {
                ServerHttpRequest request = exchange.getRequest();
                String query = request.getURI().getQuery();
                String path = request.getPath().value().replaceFirst("/" + serviceInstance.getAppName(), "");
                String url = "http://" + serviceInstance.getIp() + ":" + serviceInstance.getPort() + path;
                if (!StringUtils.isEmpty(query)) {
                    url = url + "?" + query;
                }
                return url;
            }
         
         
            /**
             * choose an ServiceInstance according to route rule config and load balancing algorithm
             *
             * @param appName
             * @param request
             * @return
             */

            private ServiceInstance chooseInstance(String appName, ServerHttpRequest request) {
                List<ServiceInstance> serviceInstances = ServiceCache.getAllInstances(appName);
                if (CollectionUtils.isEmpty(serviceInstances)) {
                    LOGGER.error("service instance of {} not find", appName);
                    throw new ShipException(ShipExceptionEnum.SERVICE_NOT_FIND);
                }
                String version = matchAppVersion(appName, request);
                if (StringUtils.isEmpty(version)) {
                    throw new ShipException("match app version error");
                }
                // filter serviceInstances by version
                List<ServiceInstance> instances = serviceInstances.stream().filter(i -> i.getVersion().equals(version)).collect(Collectors.toList());
                //Select an instance based on the load balancing algorithm
                LoadBalance loadBalance = LoadBalanceFactory.getInstance(properties.getLoadBalance(), appName, version);
                ServiceInstance serviceInstance = loadBalance.chooseOne(instances);
                return serviceInstance;
            }
         
         
            private String matchAppVersion(String appName, ServerHttpRequest request) {
                List<AppRuleDTO> rules = RouteRuleCache.getRules(appName);
                rules.sort(Comparator.comparing(AppRuleDTO::getPriority).reversed());
                for (AppRuleDTO rule : rules) {
                    if (match(rule, request)) {
                        return rule.getVersion();
                    }
                }
                return null;
            }
         
         
            private boolean match(AppRuleDTO rule, ServerHttpRequest request) {
                String matchObject = rule.getMatchObject();
                String matchKey = rule.getMatchKey();
                String matchRule = rule.getMatchRule();
                Byte matchMethod = rule.getMatchMethod();
                if (MatchObjectEnum.DEFAULT.getCode().equals(matchObject)) {
                    return true;
                } else if (MatchObjectEnum.QUERY.getCode().equals(matchObject)) {
                    String param = request.getQueryParams().getFirst(matchKey);
                    if (!StringUtils.isEmpty(param)) {
                        return StringTools.match(param, matchMethod, matchRule);
                    }
                } else if (MatchObjectEnum.HEADER.getCode().equals(matchObject)) {
                    HttpHeaders headers = request.getHeaders();
                    String headerValue = headers.getFirst(matchKey);
                    if (!StringUtils.isEmpty(headerValue)) {
                        return StringTools.match(headerValue, matchMethod, matchRule);
                    }
                }
                return false;
            }
         
        }
         

        3.3 數據同步

        app數據同步

        后臺服務(如訂單服務)啟動時,只將服務名,版本,ip地址和端口號注冊到了Nacos,并沒有實例的權重和啟用的插件信息怎么辦?

        一般在線的實例權重和插件列表都是在管理界面配置,然后動態(tài)生效的,所以需要ship-admin定時更新實例的權重和插件信息到注冊中心。

        對應代碼ship-admin的NacosSyncListener

        /**
         * @Author: Ship
         * @Description:
         * @Date: Created in 2020/12/30
         */

        @Configuration
        public class NacosSyncListener implements ApplicationListener<ContextRefreshedEvent{
         
            private static final Logger LOGGER = LoggerFactory.getLogger(NacosSyncListener.class);
         
            private static ScheduledThreadPoolExecutor scheduledPool = new ScheduledThreadPoolExecutor(1,
                    new ShipThreadFactory("nacos-sync"true).create());
         
            @NacosInjected
            private NamingService namingService;
         
            @Value("${nacos.discovery.server-addr}")
            private String baseUrl;
         
            @Resource
            private AppService appService;
         
            @Override
            public void onApplicationEvent(ContextRefreshedEvent event) {
                if (event.getApplicationContext().getParent() != null) {
                    return;
                }
                String url = "http://" + baseUrl + NacosConstants.INSTANCE_UPDATE_PATH;
                scheduledPool.scheduleWithFixedDelay(new NacosSyncTask(namingService, url, appService), 030L, TimeUnit.SECONDS);
            }
         
            class NacosSyncTask implements Runnable {
         
                private NamingService namingService;
         
                private String url;
         
                private AppService appService;
         
                private Gson gson = new GsonBuilder().create();
         
                public NacosSyncTask(NamingService namingService, String url, AppService appService) {
                    this.namingService = namingService;
                    this.url = url;
                    this.appService = appService;
                }
         
                /**
                 * Regular update weight,enabled plugins to nacos instance
                 */

                @Override
                public void run() {
                    try {
                        // get all app names
                        ListView<String> services = namingService.getServicesOfServer(1, Integer.MAX_VALUE, NacosConstants.APP_GROUP_NAME);
                        if (CollectionUtils.isEmpty(services.getData())) {
                            return;
                        }
                        List<String> appNames = services.getData();
                        List<AppInfoDTO> appInfos = appService.getAppInfos(appNames);
                        for (AppInfoDTO appInfo : appInfos) {
                            if (CollectionUtils.isEmpty(appInfo.getInstances())) {
                                continue;
                            }
                            for (ServiceInstance instance : appInfo.getInstances()) {
                                Map<String, Object> queryMap = buildQueryMap(appInfo, instance);
                                String resp = OkhttpTool.doPut(url, queryMap, "");
                                LOGGER.debug("response :{}", resp);
                            }
                        }
         
                    } catch (Exception e) {
                        LOGGER.error("nacos sync task error", e);
                    }
                }
         
                private Map<String, Object> buildQueryMap(AppInfoDTO appInfo, ServiceInstance instance) {
                    Map<String, Object> map = new HashMap<>();
                    map.put("serviceName", appInfo.getAppName());
                    map.put("groupName", NacosConstants.APP_GROUP_NAME);
                    map.put("ip", instance.getIp());
                    map.put("port", instance.getPort());
                    map.put("weight", instance.getWeight().doubleValue());
                    NacosMetadata metadata = new NacosMetadata();
                    metadata.setAppName(appInfo.getAppName());
                    metadata.setVersion(instance.getVersion());
                    metadata.setPlugins(String.join(",", appInfo.getEnabledPlugins()));
                    map.put("metadata", StringTools.urlEncode(gson.toJson(metadata)));
                    map.put("ephemeral"true);
                    return map;
                }
            }
        }
         

        ship-server再定時從Nacos拉取app數據更新到本地Map緩存。

        /**
         * @Author: Ship
         * @Description: sync data to local cache
         * @Date: Created in 2020/12/25
         */

        @Configuration
        public class DataSyncTaskListener implements ApplicationListener<ContextRefreshedEvent{
         
            private static ScheduledThreadPoolExecutor scheduledPool = new ScheduledThreadPoolExecutor(1,
                    new ShipThreadFactory("service-sync"true).create());
         
            @NacosInjected
            private NamingService namingService;
         
            @Autowired
            private ServerConfigProperties properties;
         
            @Override
            public void onApplicationEvent(ContextRefreshedEvent event) {
                if (event.getApplicationContext().getParent() != null) {
                    return;
                }
                scheduledPool.scheduleWithFixedDelay(new DataSyncTask(namingService)
                        , 0L, properties.getCacheRefreshInterval(), TimeUnit.SECONDS);
                WebsocketSyncCacheServer websocketSyncCacheServer = new WebsocketSyncCacheServer(properties.getWebSocketPort());
                websocketSyncCacheServer.start();
            }
         
         
            class DataSyncTask implements Runnable {
         
                private NamingService namingService;
         
                public DataSyncTask(NamingService namingService) {
                    this.namingService = namingService;
                }
         
                @Override
                public void run() {
                    try {
                        // get all app names
                        ListView<String> services = namingService.getServicesOfServer(1, Integer.MAX_VALUE, NacosConstants.APP_GROUP_NAME);
                        if (CollectionUtils.isEmpty(services.getData())) {
                            return;
                        }
                        List<String> appNames = services.getData();
                        // get all instances
                        for (String appName : appNames) {
                            List<Instance> instanceList = namingService.getAllInstances(appName, NacosConstants.APP_GROUP_NAME);
                            if (CollectionUtils.isEmpty(instanceList)) {
                                continue;
                            }
                            ServiceCache.add(appName, buildServiceInstances(instanceList));
                            List<String> pluginNames = getEnabledPlugins(instanceList);
                            PluginCache.add(appName, pluginNames);
                        }
                        ServiceCache.removeExpired(appNames);
                        PluginCache.removeExpired(appNames);
         
                    } catch (NacosException e) {
                        e.printStackTrace();
                    }
                }
         
                private List<String> getEnabledPlugins(List<Instance> instanceList) {
                    Instance instance = instanceList.get(0);
                    Map<String, String> metadata = instance.getMetadata();
                    // plugins: DynamicRoute,Auth
                    String plugins = metadata.getOrDefault("plugins", ShipPluginEnum.DYNAMIC_ROUTE.getName());
                    return Arrays.stream(plugins.split(",")).collect(Collectors.toList());
                }
         
                private List<ServiceInstance> buildServiceInstances(List<Instance> instanceList) {
                    List<ServiceInstance> list = new LinkedList<>();
                    instanceList.forEach(instance -> {
                        Map<String, String> metadata = instance.getMetadata();
                        ServiceInstance serviceInstance = new ServiceInstance();
                        serviceInstance.setAppName(metadata.get("appName"));
                        serviceInstance.setIp(instance.getIp());
                        serviceInstance.setPort(instance.getPort());
                        serviceInstance.setVersion(metadata.get("version"));
                        serviceInstance.setWeight((int) instance.getWeight());
                        list.add(serviceInstance);
                    });
                    return list;
                }
            }
        }
         

        路由規(guī)則數據同步

        同時,如果用戶在管理后臺更新了路由規(guī)則,ship-admin需要推送規(guī)則數據到ship-server,這里參考了soul網關的做法利用websocket在第一次建立連接后進行全量同步,此后路由規(guī)則發(fā)生變更就只作增量同步。

        最新 Java 核心技術教程,都在這了!

        服務端WebsocketSyncCacheServer:

        /**
         * @Author: Ship
         * @Description:
         * @Date: Created in 2020/12/28
         */

        public class WebsocketSyncCacheServer extends WebSocketServer {
         
            private final static Logger LOGGER = LoggerFactory.getLogger(WebsocketSyncCacheServer.class);
         
            private Gson gson = new GsonBuilder().create();
         
            private MessageHandler messageHandler;
         
            public WebsocketSyncCacheServer(Integer port) {
                super(new InetSocketAddress(port));
                this.messageHandler = new MessageHandler();
            }
         
         
            @Override
            public void onOpen(WebSocket webSocket, ClientHandshake clientHandshake) {
                LOGGER.info("server is open");
            }
         
            @Override
            public void onClose(WebSocket webSocket, int i, String s, boolean b) {
                LOGGER.info("websocket server close...");
            }
         
            @Override
            public void onMessage(WebSocket webSocket, String message) {
                LOGGER.info("websocket server receive message:\n[{}]", message);
                this.messageHandler.handler(message);
            }
         
            @Override
            public void onError(WebSocket webSocket, Exception e) {
         
            }
         
            @Override
            public void onStart() {
                LOGGER.info("websocket server start...");
            }
         
         
            class MessageHandler {
         
                public void handler(String message) {
                    RouteRuleOperationDTO operationDTO = gson.fromJson(message, RouteRuleOperationDTO.class);
                    if (CollectionUtils.isEmpty(operationDTO.getRuleList())) {
                        return;
                    }
                    Map<String, List<AppRuleDTO>> map = operationDTO.getRuleList()
                            .stream().collect(Collectors.groupingBy(AppRuleDTO::getAppName));
                    if (OperationTypeEnum.INSERT.getCode().equals(operationDTO.getOperationType())
                            || OperationTypeEnum.UPDATE.getCode().equals(operationDTO.getOperationType())) {
                        RouteRuleCache.add(map);
                    } else if (OperationTypeEnum.DELETE.getCode().equals(operationDTO.getOperationType())) {
                        RouteRuleCache.remove(map);
                    }
                }
            }
        }
         

        客戶端WebsocketSyncCacheClient:

        /**
         * @Author: Ship
         * @Description:
         * @Date: Created in 2020/12/28
         */

        @Component
        public class WebsocketSyncCacheClient {
         
            private final static Logger LOGGER = LoggerFactory.getLogger(WebsocketSyncCacheClient.class);
         
            private WebSocketClient client;
         
            private RuleService ruleService;
         
            private Gson gson = new GsonBuilder().create();
         
            public WebsocketSyncCacheClient(@Value("${ship.server-web-socket-url}") String serverWebSocketUrl,
                                            RuleService ruleService) 
        {
                if (StringUtils.isEmpty(serverWebSocketUrl)) {
                    throw new ShipException(ShipExceptionEnum.CONFIG_ERROR);
                }
                this.ruleService = ruleService;
                ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1,
                        new ShipThreadFactory("websocket-connect"true).create());
                try {
                    client = new WebSocketClient(new URI(serverWebSocketUrl)) {
                        @Override
                        public void onOpen(ServerHandshake serverHandshake) {
                            LOGGER.info("client is open");
                            List<AppRuleDTO> list = ruleService.getEnabledRule();
                            String msg = gson.toJson(new RouteRuleOperationDTO(OperationTypeEnum.INSERT, list));
                            send(msg);
                        }
         
                        @Override
                        public void onMessage(String s) {
                        }
         
                        @Override
                        public void onClose(int i, String s, boolean b) {
                        }
         
                        @Override
                        public void onError(Exception e) {
                            LOGGER.error("websocket client error", e);
                        }
                    };
         
                    client.connectBlocking();
                    //使用調度線程池進行斷線重連,30秒進行一次
                    executor.scheduleAtFixedRate(() -> {
                        if (client != null && client.isClosed()) {
                            try {
                                client.reconnectBlocking();
                            } catch (InterruptedException e) {
                                LOGGER.error("reconnect server fail", e);
                            }
                        }
                    }, 1030, TimeUnit.SECONDS);
         
                } catch (Exception e) {
                    LOGGER.error("websocket sync cache exception", e);
                    throw new ShipException(e.getMessage());
                }
            }
         
            public <T> void send(T t) {
                while (!client.getReadyState().equals(ReadyState.OPEN)) {
                    LOGGER.debug("connecting ...please wait");
                }
                client.send(gson.toJson(t));
            }
        }
         

        四、測試

        4.1 動態(tài)路由測試

        1)本地啟動nacos ,sh startup.sh -m standalone

        2)啟動ship-admin

        3)本地啟動兩個ship-example實例。

        實例1配置:

        ship:
         http:
           app-name: order
           version: gray_1.0
           context-path: /order
           port: 8081
           admin-url: 127.0.0.1:9001

        server:
         port: 8081

        nacos:
         discovery:
           server-addr: 127.0.0.1:8848

        實例2配置:

        ship:
          http:
            app-name: order
            version: prod_1.0
            context-path: /order
            port: 8082
            admin-url: 127.0.0.1:9001

        server:
          port: 8082

        nacos:
          discovery:
            server-addr: 127.0.0.1:8848
        4)在數據庫添加路由規(guī)則配置,該規(guī)則表示當http header 中的name=ship時請求路由到gray_1.0版本的節(jié)點。
        啟動ship-server,看到以下日志時則可以進行測試了。
           2021-01-02 19:57:09.159  INFO 30413 --- [SocketWorker-29] cn.sp.sync.WebsocketSyncCacheServer      : websocket server receive message:
           [{"operationType":"INSERT","ruleList":[{"id":1,"appId":5,"appName":"order","version":"gray_1.0","matchObject":"HEADER","matchKey":"name","matchMethod":1,"matchRule":"ship","priority":50}]}]
        用Postman請求http://localhost:9000/order/user/add,POST方式,header設置name=ship,可以看到只有實例1有日志顯示。
        ==========add user,version:gray_1.0

        4.2 性能壓測

        壓測環(huán)境:

        MacBook Pro 13英寸

        處理器 2.3 GHz 四核Intel Core i7

        內存 16 GB 3733 MHz LPDDR4X

        后端節(jié)點個數一個

        壓測工具:wrk

        壓測結果:20個線程,500個連接數,吞吐量大概每秒9400個請求。

        五、總結

        千里之行始于足下,開始以為寫一個網關會很難,但當你實際開始行動時就會發(fā)現其實沒那么難,所以邁出第一步很重要。過程中也遇到了很多問題,還在github上給soul和nacos這兩個開源項目提了兩個issue,后來發(fā)現是自己的問題,尷尬??。

        本文代碼已全部上傳到 github:https://github.com/2YSP/ship-gate,最后,希望此文對你有所幫助。

        參考資料:

        https://nacos.io/zh-cn/docs/quick-start.html
        https://dromara.org/website/zh-cn/docs/soul/soul.html
        https://docs.spring.io/spring-framework/docs/5.1.7.RELEASE/spring-framework-reference/web-reactive.html#webflux
        https://github.com/TooTallNate/Java-WebSocket






        關注Java技術??锤喔韶?/strong>



        獲取 Spring Boot 實戰(zhàn)筆記!
        瀏覽 36
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

          <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            美女插逼网站 | 男生下面插女生下面 | 国产精品无码久久久久久久 | 午夜性爱福利视频 | 猛操极品 | 奇米影视亚洲 | 欧美无人区码AAAAA | 啊啊啊啊啊好大好深 | 成人做爰无码A片韩国电影网斗生 | 一边亲一边摸免费观看30分钟 |