1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        WebSocket 集群解決方案

        共 10811字,需瀏覽 22分鐘

         ·

        2021-10-24 17:06

        問題起因

        最近做項目時遇到了需要多用戶之間通信的問題,涉及到了WebSocket握手請求,以及集群中WebSocket Session共享的問題。

        期間我經(jīng)過了幾天的研究,總結(jié)出了幾個實現(xiàn)分布式WebSocket集群的辦法,從zuul到spring cloud gateway的不同嘗試,總結(jié)出了這篇文章,希望能幫助到某些人,并且能一起分享這方面的想法與研究。

        以下是我的場景描述

        • 資源:4臺服務器。其中只有一臺服務器具備ssl認證域名,一臺redis+mysql服務器,兩臺應用服務器(集群)
        • 應用發(fā)布限制條件:由于場景需要,應用場所需要ssl認證的域名才能發(fā)布。因此ssl認證的域名服務器用來當api網(wǎng)關(guān),負責https請求與wss(安全認證的ws)連接。俗稱https卸載,用戶請求https域名服務器(eg:https://oiscircle.com/xxx),但真實訪問到的是http+ip地址的形式。只要網(wǎng)關(guān)配置高,能handle多個應用
        • 需求:用戶登錄應用,需要與服務器建立wss連接,不同角色之間可以單發(fā)消息,也可以群發(fā)消息
        • 集群中的應用服務類型:每個集群實例都負責http無狀態(tài)請求服務與ws長連接服務

        系統(tǒng)架構(gòu)圖

        在我的實現(xiàn)里,每個應用服務器都負責http and ws請求,其實也可以將ws請求建立的聊天模型單獨成立為一個模塊。從分布式的角度來看,這兩種實現(xiàn)類型差不多,但從實現(xiàn)方便性來說,一個應用服務http+ws請求的方式更為方便。下文會有解釋

        本文涉及的技術(shù)棧

        • Eureka 服務發(fā)現(xiàn)與注冊
        • Redis Session共享
        • Redis 消息訂閱
        • Spring Boot
        • Zuul 網(wǎng)關(guān)
        • Spring Cloud Gateway 網(wǎng)關(guān)
        • Spring WebSocket 處理長連接
        • Ribbon 負載均衡
        • Netty 多協(xié)議NIO網(wǎng)絡通信框架
        • Consistent Hash 一致性哈希算法

        相信能走到這一步的人都了解過我上面列舉的技術(shù)棧了,如果還沒有,可以先去網(wǎng)上找找入門教程了解一下。下面的內(nèi)容都與上述技術(shù)相關(guān),題主默認大家都了解過了...

        技術(shù)可行性分析

        下面我將描述session特性,以及根據(jù)這些特性列舉出n個解決分布式架構(gòu)中處理ws請求的集群方案

        WebSocketSession與HttpSession

        在Spring所集成的WebSocket里面,每個ws連接都有一個對應的session:WebSocketSession,在Spring WebSocket中,我們建立ws連接之后可以通過類似這樣的方式進行與客戶端的通信:

        protected?void?handleTextMessage(WebSocketSession?session,?TextMessage?message)?{
        ???System.out.println("服務器接收到的消息:?"+?message?);
        ???//send?message?to?client
        ???session.sendMessage(new?TextMessage("message"));
        }

        那么問題來了:ws的session無法序列化到redis,因此在集群中,我們無法將所有WebSocketSession都緩存到redis進行session共享。每臺服務器都有各自的session。于此相反的是HttpSession,redis可以支持httpsession共享,但是目前沒有websocket session共享的方案,因此走redis websocket session共享這條路是行不通的。

        有的人可能會想:我可不可以將sessin關(guān)鍵信息緩存到redis,集群中的服務器從redis拿取session關(guān)鍵信息然后重新構(gòu)建websocket session...我只想說這種方法如果有人能試出來,請告訴我一聲...

        以上便是websocket session與http session共享的區(qū)別,總的來說就是http session共享已經(jīng)有解決方案了,而且很簡單,只要引入相關(guān)依賴:spring-session-data-redisspring-boot-starter-redis,大家可以從網(wǎng)上找個demo玩一下就知道怎么做了。而websocket session共享的方案由于websocket底層實現(xiàn)的方式,我們無法做到真正的websocket session共享。

        解決方案的演變

        Netty與Spring WebSocket

        剛開始的時候,我嘗試著用netty實現(xiàn)了websocket服務端的搭建。在netty里面,并沒有websocket session這樣的概念,與其類似的是channel,每一個客戶端連接都代表一個channel。前端的ws請求通過netty監(jiān)聽的端口,走websocket協(xié)議進行ws握手連接之后,通過一些列的handler(責鏈模式)進行消息處理。與websocket session類似地,服務端在連接建立后有一個channel,我們可以通過channel進行與客戶端的通信

        ???/**
        ????*?TODO?根據(jù)服務器傳進來的id,分配到不同的group
        ????*/

        ???private?static?final?ChannelGroup?GROUP?=?new?DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
        ?
        ???@Override
        ???protected?void?channelRead0(ChannelHandlerContext?ctx,?TextWebSocketFrame?msg)?throws?Exception?{
        ???????//retain增加引用計數(shù),防止接下來的調(diào)用引用失效
        ???????System.out.println("服務器接收到來自?"?+?ctx.channel().id()?+?"?的消息:?"?+?msg.text());
        ???????//將消息發(fā)送給group里面的所有channel,也就是發(fā)送消息給客戶端
        ???????GROUP.writeAndFlush(msg.retain());
        ???}

        那么,服務端用netty還是用spring websocket?以下我將從幾個方面列舉這兩種實現(xiàn)方式的優(yōu)缺點

        使用netty實現(xiàn)websocket

        玩過netty的人都知道netty是的線程模型是nio模型,并發(fā)量非常高,spring5之前的網(wǎng)絡線程模型是servlet實現(xiàn)的,而servlet不是nio模型,所以在spring5之后,spring的底層網(wǎng)絡實現(xiàn)采用了netty。如果我們單獨使用netty來開發(fā)websocket服務端,速度快是絕對的,但是可能會遇到下列問題:

        1. 與系統(tǒng)的其他應用集成不方便,在rpc調(diào)用的時候,無法享受springcloud里feign服務調(diào)用的便利性
        2. 業(yè)務邏輯可能要重復實現(xiàn)
        3. 使用netty可能需要重復造輪子
        4. 怎么連接上服務注冊中心,也是一件麻煩的事情
        5. restful服務與ws服務需要分開實現(xiàn),如果在netty上實現(xiàn)restful服務,有多麻煩可想而知,用spring一站式restful開發(fā)相信很多人都習慣了。

        使用spring websocket實現(xiàn)ws服務

        spring websocket已經(jīng)被springboot很好地集成了,所以在springboot上開發(fā)ws服務非常方便,做法非常簡單

        第一步:添加依賴

        <dependency>
        ???<groupId>org.springframework.bootgroupId>
        ???<artifactId>spring-boot-starter-websocketartifactId>
        dependency>

        第二步:添加配置類

        @Configuration
        public?class?WebSocketConfig?implements?WebSocketConfigurer?{
        @Override
        public?void?registerWebSocketHandlers(WebSocketHandlerRegistry?registry)?{
        ????registry.addHandler(myHandler(),?"/")
        ????????.setAllowedOrigins("*");
        }
        ?
        @Bean
        ?public?WebSocketHandler?myHandler()?{
        ?????return?new?MessageHandler();
        ?}
        }

        第三步:實現(xiàn)消息監(jiān)聽類

        @Component
        @SuppressWarnings("unchecked")
        public?class?MessageHandler?extends?TextWebSocketHandler?{
        ???private?List?clients?=?new?ArrayList<>();
        ?
        ???@Override
        ???public?void?afterConnectionEstablished(WebSocketSession?session)?{
        ???????clients.add(session);
        ???????System.out.println("uri?:"?+?session.getUri());
        ???????System.out.println("連接建立:?"?+?session.getId());
        ???????System.out.println("current?seesion:?"?+?clients.size());
        ???}
        ?
        ???@Override
        ???public?void?afterConnectionClosed(WebSocketSession?session,?CloseStatus?status)?{
        ???????clients.remove(session);
        ???????System.out.println("斷開連接:?"?+?session.getId());
        ???}
        ?
        ???@Override
        ???protected?void?handleTextMessage(WebSocketSession?session,?TextMessage?message)?{
        ???????String?payload?=?message.getPayload();
        ???????Map?map?=?JSONObject.parseObject(payload,?HashMap.class);
        ???????System.out.println("接受到的數(shù)據(jù)"?+?map);
        ???????clients.forEach(s?->?{
        ???????????try?{
        ???????????????System.out.println("發(fā)送消息給:?"?+?session.getId());
        ???????????????s.sendMessage(new?TextMessage("服務器返回收到的信息,"?+?payload));
        ???????????}?catch?(Exception?e)?{
        ???????????????e.printStackTrace();
        ???????????}
        ???????});
        ???}
        }

        從這個demo中,使用spring websocket實現(xiàn)ws服務的便利性大家可想而知了。為了能更好地向spring cloud大家族看齊,我最終采用了spring websocket實現(xiàn)ws服務。

        因此我的應用服務架構(gòu)是這樣子的:一個應用既負責restful服務,也負責ws服務。沒有將ws服務模塊拆分是因為拆分出去要使用feign來進行服務調(diào)用。第一本人比較懶惰,第二拆分與不拆分相差在多了一層服務間的io調(diào)用,所以就沒有這么做了。

        學習資料:Java進階視頻資源

        從zuul技術(shù)轉(zhuǎn)型到spring cloud gateway

        要實現(xiàn)websocket集群,我們必不可免地得從zuul轉(zhuǎn)型到spring cloud gateway。原因如下:

        zuul1.0版本不支持websocket轉(zhuǎn)發(fā),zuul 2.0開始支持websocket,zuul2.0幾個月前開源了,但是2.0版本沒有被spring boot集成,而且文檔不健全。因此轉(zhuǎn)型是必須的,同時轉(zhuǎn)型也很容易實現(xiàn)。

        在gateway中,為了實現(xiàn)ssl認證和動態(tài)路由負載均衡,yml文件中以下的某些配置是必須的,在這里提前避免大家采坑

        server:
        ??port:?443
        ??ssl:
        ????enabled:?true
        ????key-store:?classpath:xxx.jks
        ????key-store-password:?xxxx
        ????key-store-type:?JKS
        ????key-alias:?alias
        spring:
        ??application:
        ????name:?api-gateway
        ??cloud:
        ????gateway:
        ??????httpclient:
        ????????ssl:
        ??????????handshake-timeout-millis:?10000
        ??????????close-notify-flush-timeout-millis:?3000
        ??????????close-notify-read-timeout-millis:?0
        ??????????useInsecureTrustManager:?true
        ??????discovery:
        ????????locator:
        ??????????enabled:?true
        ??????????lower-case-service-id:?true
        ??????routes:
        ??????-?id:?dc
        ????????uri:?lb://dc
        ????????predicates:
        ????????-?Path=/dc/**
        ??????-?id:?wecheck
        ????????uri:?lb://wecheck
        ????????predicates:
        ????????-?Path=/wecheck/**

        如果要愉快地玩https卸載,我們還需要配置一個filter,否則請求網(wǎng)關(guān)時會出現(xiàn)錯誤not an SSL/TLS record

        @Component
        public?class?HttpsToHttpFilter?implements?GlobalFilter,?Ordered?{
        ??private?static?final?int?HTTPS_TO_HTTP_FILTER_ORDER?=?10099;
        ??@Override
        ??public?Mono?filter(ServerWebExchange?exchange,?GatewayFilterChain?chain)?{
        ??????URI?originalUri?=?exchange.getRequest().getURI();
        ??????ServerHttpRequest?request?=?exchange.getRequest();
        ??????ServerHttpRequest.Builder?mutate?=?request.mutate();
        ??????String?forwardedUri?=?request.getURI().toString();
        ??????if?(forwardedUri?!=?null?&&?forwardedUri.startsWith("https"))?{
        ??????????try?{
        ??????????????URI?mutatedUri?=?new?URI("http",
        ??????????????????????originalUri.getUserInfo(),
        ??????????????????????originalUri.getHost(),
        ??????????????????????originalUri.getPort(),
        ??????????????????????originalUri.getPath(),
        ??????????????????????originalUri.getQuery(),
        ??????????????????????originalUri.getFragment());
        ??????????????mutate.uri(mutatedUri);
        ??????????}?catch?(Exception?e)?{
        ??????????????throw?new?IllegalStateException(e.getMessage(),?e);
        ??????????}
        ??????}
        ??????ServerHttpRequest?build?=?mutate.build();
        ??????ServerWebExchange?webExchange?=?exchange.mutate().request(build).build();
        ??????return?chain.filter(webExchange);
        ??}
        ?
        ??@Override
        ??public?int?getOrder()?{
        ??????return?HTTPS_TO_HTTP_FILTER_ORDER;
        ??}
        }

        這樣子我們就可以使用gateway來卸載https請求了,到目前為止,我們的基本框架已經(jīng)搭建完畢,網(wǎng)關(guān)既可以轉(zhuǎn)發(fā)https請求,也可以轉(zhuǎn)發(fā)wss請求。接下來就是用戶多對多之間session互通的通訊解決方案了。接下來,我將根據(jù)方案的優(yōu)雅性,從最不優(yōu)雅的方案開始講起。

        session廣播

        這是最簡單的websocket集群通訊解決方案。場景如下:

        教師A想要群發(fā)消息給他的學生們

        • 教師的消息請求發(fā)給網(wǎng)關(guān),內(nèi)容包含{我是教師A,我想把xxx消息發(fā)送我的學生們}
        • 網(wǎng)關(guān)接收到消息,獲取集群所有ip地址,逐個調(diào)用教師的請求
        • 集群中的每臺服務器獲取請求,根據(jù)教師A的信息查找本地有沒有與學生關(guān)聯(lián)的session,有則調(diào)用sendMessage方法,沒有則忽略請求

        session廣播實現(xiàn)很簡單,但是有一個致命缺陷:計算力浪費現(xiàn)象,當服務器沒有消息接收者session的時候,相當于浪費了一次循環(huán)遍歷的計算力,該方案在并發(fā)需求不高的情況下可以優(yōu)先考慮,實現(xiàn)很容易。

        spring cloud中獲取服務集群中每臺服務器信息的方法如下

        @Resource
        private?EurekaClient?eurekaClient;
        ?
        Application?app?=?eurekaClient.getApplication("service-name");
        //instanceInfo包括了一臺服務器ip,port等消息
        InstanceInfo?instanceInfo?=?app.getInstances().get(0);
        System.out.println("ip?address:?"?+?instanceInfo.getIPAddr());

        服務器需要維護關(guān)系映射表,將用戶的id與session做映射,session建立時在映射表中添加映射關(guān)系,session斷開后要刪除映射表內(nèi)關(guān)聯(lián)關(guān)系

        一致性哈希算法實現(xiàn)(本文的要點)

        這種方法是本人認為最優(yōu)雅的實現(xiàn)方案,理解這種方案需要一定的時間,如果你耐心看下去,相信你一定會有所收獲。再強調(diào)一次,不了解一致性哈希算法的同學請先看這里,現(xiàn)先假設哈希環(huán)是順時針查找的。

        首先,想要將一致性哈希算法的思想應用到我們的websocket集群,我們需要解決以下新問題:

        • 集群節(jié)點DOWN,會影響到哈希環(huán)映射到狀態(tài)是DOWN的節(jié)點。
        • 集群節(jié)點UP,會影響到舊key映射不到對應的節(jié)點。
        • 哈希環(huán)讀寫共享。

        在集群中,總會出現(xiàn)服務UP/DOWN的問題。

        針對節(jié)點DOWN的問題分析如下:

        一個服務器DOWN的時候,其擁有的websocket session會自動關(guān)閉連接,并且前端會收到通知。此時會影響到哈希環(huán)的映射錯誤。我們只需要當監(jiān)聽到服務器DOWN的時候,刪除哈希環(huán)上面對應的實際結(jié)點和虛結(jié)點,避免讓網(wǎng)關(guān)轉(zhuǎn)發(fā)到狀態(tài)是DOWN的服務器上。

        實現(xiàn)方法:在eureka治理中心監(jiān)聽集群服務DOWN事件,并及時更新哈希環(huán)。

        針對節(jié)點UP的問題分析如下:

        現(xiàn)假設集群中有服務 CacheB上線了,該服務器的ip地址剛好被映射到key1和 cacheA之間。那么key1對應的用戶每次要發(fā)消息時都跑去 CacheB發(fā)送消息,結(jié)果明顯是發(fā)送不了消息,因為 CacheB沒有key1對應的session。

        此時我們有兩種解決方案。

        方案A簡單,動作大:

        eureka監(jiān)聽到節(jié)點UP事件之后,根據(jù)現(xiàn)有集群信息,更新哈希環(huán)。并且斷開所有session連接,讓客戶端重新連接,此時客戶端會連接到更新后的哈希環(huán)節(jié)點,以此避免消息無法送達的情況。

        方案B復雜,動作?。?/strong>

        我們先看看沒有虛擬節(jié)點的情況,假設 CacheC和 CacheA之間上線了服務器 CacheB。所有映射在 CacheC到 CacheB的用戶發(fā)消息時都會去 CacheB里面找session發(fā)消息。也就是說 CacheB一但上線,便會影響到 CacheC到 CacheB之間的用戶發(fā)送消息。所以我們只需要將 CacheA斷開 CacheC到 CacheB的用戶所對應的session,讓客戶端重連。

        接下來是有虛擬節(jié)點的情況,假設淺色的節(jié)點是虛擬節(jié)點。我們用長括號來代表某段區(qū)域映射的結(jié)果屬于某個 Cache。首先是C節(jié)點未上線的情況。圖大家應該都懂吧,所有B的虛擬節(jié)點都會指向真實的B節(jié)點,所以所有B節(jié)點逆時針那一部分都會映射到B(因為我們規(guī)定哈希環(huán)順時針查找)。

        接下來是C節(jié)點上線的情況,可以看到某些區(qū)域被C占領(lǐng)了。

        由以上情況我們可以知道:節(jié)點上線,會有許多對應虛擬節(jié)點也同時上線,因此我們需要將多段范圍key對應的session斷開連接(上圖紅色的部分)。具體算法有點復雜,實現(xiàn)的方式因人而異,大家可以嘗試一下自己實現(xiàn)算法。

        哈希環(huán)應該放在哪里?

        • gateway本地創(chuàng)建并維護哈希環(huán)。當ws請求進來的時候,本地獲取哈希環(huán)并獲取映射服務器信息,轉(zhuǎn)發(fā)ws請求。這種方法看上去不錯,但實際上是不太可取的,回想一下上面服務器DOWN的時候只能通過eureka監(jiān)聽,那么eureka監(jiān)聽到DOWN事件之后,需要通過io來通知gateway刪除對應節(jié)點嗎?顯然太麻煩了,將eureka的職責分散到gateway,不建議這么做。
        • eureka創(chuàng)建,并放到redis共享讀寫。這個方案可行,當eureka監(jiān)聽到服務DOWN的時候,修改哈希環(huán)并推送到redis上。為了請求響應時間盡量地短,我們不可以讓gateway每次轉(zhuǎn)發(fā)ws請求的時候都去redis取一次哈希環(huán)。哈希環(huán)修改的概率的確很低,gateway只需要應用redis的消息訂閱模式,訂閱哈希環(huán)修改事件便可以解決此問題。

        至此我們的spring websocket集群已經(jīng)搭建的差不多了,最重要的地方還是一致性哈希算法?,F(xiàn)在有最后一個技術(shù)瓶頸,網(wǎng)關(guān)如何根據(jù)ws請求轉(zhuǎn)發(fā)到指定的集群服務器上?

        答案在負載均衡。spring cloud gateway或zuul都默認集成了ribbon作為負載均衡,我們只需要根據(jù)建立ws請求時客戶端發(fā)來的user id,重寫ribbon負載均衡算法,根據(jù)user id進行hash,并在哈希環(huán)上尋找ip,并將ws請求轉(zhuǎn)發(fā)到該ip便完事了。流程如下圖所示:

        接下來用戶溝通的時候,只需要根據(jù)id進行hash,在哈希環(huán)上獲取對應ip,便可以知道與該用戶建立ws連接時的session存在哪臺服務器上了!

        spring cloud Finchley.RELEASE 版本中ribbon未完善的地方

        題主在實際操作的時候發(fā)現(xiàn)了ribbon兩個不完善的地方......

        • 根據(jù)網(wǎng)上找的方法,繼承AbstractLoadBalancerRule重寫負載均衡策略之后,多個不同應用的請求變得混亂。假如eureka上有兩個service A和B,重寫負載均衡策略之后,請求A或B的服務,最終只會映射到其中一個服務上。非常奇怪!可能spring cloud gateway官網(wǎng)需要給出一個正確的重寫負載均衡策略的demo。
        • 一致性哈希算法需要一個key,類似user id,根據(jù)key進行hash之后在哈希環(huán)上搜索并返回ip。但是ribbon沒有完善choose函數(shù)的key參數(shù),直接寫死了default!

        難道這樣子我們就沒有辦法了嗎?其實還有一個可行并且暫時可替代的辦法!

        如下圖所示,客戶端發(fā)送一個普通的http請求(包含id參數(shù))給網(wǎng)關(guān),網(wǎng)關(guān)根據(jù)id進行hash,在哈希環(huán)中尋找ip地址,將ip地址返回給客戶端,客戶端再根據(jù)該ip地址進行ws請求。

        由于ribbon未完善key的處理,我們暫時無法在ribbon上實現(xiàn)一致性哈希算法。只能間接地通過客戶端發(fā)起兩次請求(一次http,一次ws)的方式來實現(xiàn)一致性哈希。希望不久之后ribbon能更新這個缺陷!讓我們的websocket集群實現(xiàn)得更優(yōu)雅一點。

        后記

        以上便是我這幾天探索的結(jié)果。期間遇到了許多問題,并逐一解決難題,列出兩個websocket集群解決方案。第一個是session廣播,第二個是一致性哈希。

        這兩種方案針對不同場景各有優(yōu)缺點,本文并未用到ActiveMQ,Karfa等消息隊列實現(xiàn)消息推送,只是想通過自己的想法,不依靠消息隊列來簡單地實現(xiàn)多用戶之間的長連接通訊。希望能為大家提供一條不同于尋常的思路。

        來源:blog.csdn.net/weixin_34194702/

        article/details/88701309

        推薦閱讀:

        世界的真實格局分析,地球人類社會底層運行原理

        不是你需要中臺,而是一名合格的架構(gòu)師(附各大廠中臺建設PPT)

        企業(yè)IT技術(shù)架構(gòu)規(guī)劃方案

        論數(shù)字化轉(zhuǎn)型——轉(zhuǎn)什么,如何轉(zhuǎn)?

        企業(yè)10大管理流程圖,數(shù)字化轉(zhuǎn)型從業(yè)者必備!

        【中臺實踐】華為大數(shù)據(jù)中臺架構(gòu)分享.pdf

        華為的數(shù)字化轉(zhuǎn)型方法論

        華為如何實施數(shù)字化轉(zhuǎn)型(附PPT)

        超詳細280頁Docker實戰(zhàn)文檔!開放下載

        華為大數(shù)據(jù)解決方案(PPT)

        瀏覽 46
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

          <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            91亚洲欧美激情 | 欧美性爱xxxx黑人xyx久久 | 国产午夜视频在线观看 | 国产一级做受视频 | 男人插女人的屁股 | 日韩人妻电影 | 国产精品国产三级国产aⅴ入口 | 国内免费无码操妣 | 欧美一级黄色逼图大全 | 操12p|