1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        一個(gè)注解,優(yōu)雅的實(shí)現(xiàn) WebSocket 集群!

        共 5733字,需瀏覽 12分鐘

         ·

        2023-08-31 09:12

        d269129b1aab1a35ca00ba3b384be403.webp程序員的成長(zhǎng)之路互聯(lián)網(wǎng)/程序員/技術(shù)/資料共享  關(guān)注


        閱讀本文大概需要 8 分鐘。

        來自:blog.csdn.net/m0_64360721/article/details/125597943

        介紹

        WebSocket大家應(yīng)該是再熟悉不過了,如果是單體應(yīng)用確實(shí)不會(huì)有什么問題,但是當(dāng)我們的項(xiàng)目使用微服務(wù)架構(gòu)時(shí),就可能會(huì)存在問題比如服務(wù)A有兩個(gè)實(shí)例A1和A2,前端的WebSocket客戶端C通過網(wǎng)關(guān)的負(fù)載均衡連到了A1,這個(gè)時(shí)候當(dāng)A2觸發(fā)消息發(fā)送的邏輯,需要將某個(gè)消息發(fā)送給所有的客戶端時(shí),C就接受不到消息這個(gè)時(shí)候我們很快就能想到一種最簡(jiǎn)單的解決方案,就是把A2的消息轉(zhuǎn)發(fā)給A1,A1再把消息發(fā)送給C,這樣C就能收到A2發(fā)送的消息了 7fc87e03273cd288fafbe9bddc825c69.webp基于這個(gè)思路,我實(shí)現(xiàn)了一個(gè)庫(kù),一個(gè)配置注解搞定一切
        • 傳送門:https://github.com/Linyuzai/concept/wiki/Concept-WebSocket-LoadBalance
        ?

        用法

        接下來讓我們看看這個(gè)庫(kù)的用法 首先我們需要在啟動(dòng)類上添加一個(gè)注解@EnableWebSocketLoadBalanceConcept
            

        @EnableWebSocketLoadBalanceConcept
        @EnableDiscoveryClient
        @SpringBootApplication
        public class AServiceApplication {

            public static void main(String[] args) {
                SpringApplication.run(AServiceApplication.classargs);
            }
        }

        接著我們?cè)谛枰l(fā)送消息的地方注入WebSocketLoadBalanceConcept就可以愉快的跨實(shí)例發(fā)消息啦
            

        @RestController
        @RequestMapping("/ws")
        public class WsController {

            @Autowired
            private WebSocketLoadBalanceConcept concept;

            @RequestMapping("/send")
            public void send(@RequestParam String msg) {
                concept.send(msg);
            }
        }

        是不是很簡(jiǎn)單,有沒有覺得比自己集成單體應(yīng)用的WebSocket還要簡(jiǎn)單!當(dāng)你的同事還在頭疼要實(shí)現(xiàn)手動(dòng)轉(zhuǎn)發(fā)時(shí)你已經(jīng)通過一個(gè)配置注解實(shí)現(xiàn)了功能并開始泡茶喝你的同事肯定對(duì)你刮目相看?。ㄓ帜荛_始摸魚了)不知道大家看了之后是不是對(duì)具體實(shí)現(xiàn)已經(jīng)有了一些思路呢接下來我就來講講這個(gè)庫(kù)的實(shí)現(xiàn)流程

        抽象思路

        其實(shí)我之前有專門針對(duì)WebSocket實(shí)現(xiàn)過類似功能的模塊,只是當(dāng)時(shí)的一些場(chǎng)景都是基于項(xiàng)目定死的,所以相對(duì)來說實(shí)現(xiàn)比較簡(jiǎn)單,但是過于定制化不好擴(kuò)展有一天在和我的一個(gè)前同事聊天的過程中得知,他們?cè)诳紤]讓設(shè)備和服務(wù)直連,并且服務(wù)要部署成多實(shí)例,設(shè)備和服務(wù)直連無非就是通過TCP這種長(zhǎng)連接來實(shí)現(xiàn),可以使用緩存來保存連接和服務(wù)地址的映射關(guān)系來實(shí)現(xiàn)點(diǎn)對(duì)點(diǎn)轉(zhuǎn)發(fā)的功能需求?聽到這里,是不是感覺似曾相識(shí)?當(dāng)時(shí)就有一道光穿過我的腦瓜子,真相只有一個(gè)!這不就和WebSocket在集群模式下的問題一樣么于是我從原來針對(duì)WebSocket的思考,變成了對(duì)各種長(zhǎng)連接的思考,最終我將這個(gè)問題抽象成了:長(zhǎng)連接的集群方案而不管是WebSocket還是TCP都是長(zhǎng)連接的一種具體實(shí)現(xiàn)所以我們可以抽象一個(gè)頂級(jí)接口Connection,然后實(shí)現(xiàn)WebSocketConnection或者是TCPConnection其實(shí)從抽象的角度來說不僅僅是長(zhǎng)連接,短連接也在我們的抽象范圍之內(nèi),只不過類似HTTP等協(xié)議并不存在上述的問題,但是并不妨礙你實(shí)現(xiàn)一個(gè)HTTPConnection用于轉(zhuǎn)發(fā)消息,所以大家不要被先入為主的思維束縛住了

        轉(zhuǎn)發(fā)思路

        之前講到,這個(gè)庫(kù)的主要思路就是將消息轉(zhuǎn)發(fā)給其他的服務(wù)實(shí)例來達(dá)到一個(gè)單播或廣播的效果所以消息轉(zhuǎn)發(fā)的設(shè)計(jì)就非常重要了首先消息轉(zhuǎn)發(fā)需要憑借一些支持?jǐn)?shù)據(jù)交互的技術(shù)手段比如HTTP,MQ,TCP,WebSocket說到這里。。。大家是不是。。。你TM原來自己就能搞定?。ㄏ谱溃╅L(zhǎng)連接不就是用來交互數(shù)據(jù)的嗎,所以完全可以自給自足啊于是就有一個(gè)精妙的想法在我腦子里形成:如果每個(gè)服務(wù)實(shí)例都把自己作為一個(gè)客戶端,連接到其他服務(wù)上呢?
        • WebSocket的場(chǎng)景下,我們將當(dāng)前服務(wù)實(shí)例作為一個(gè)WebSocket客戶端去連接其他服務(wù)實(shí)例的WebSocket服務(wù)端

        • TCP的場(chǎng)景下,我們將當(dāng)前服務(wù)實(shí)例作為一個(gè)TCP的客戶端去連接其他服務(wù)實(shí)例的TCP服務(wù)端

        這樣其他服務(wù)實(shí)例就可以把消息發(fā)到這些偽裝的客戶端上,當(dāng)服務(wù)實(shí)例上偽裝的客戶端接收到消息之后就可以再轉(zhuǎn)發(fā)給自己管理的真正的客戶端撒花家人們,自閉(自我閉環(huán))了屬于是所以我們首先需要先讓服務(wù)實(shí)例之間相互連接上

        連接流程

        讓我們來看看互相建立連接是怎么設(shè)計(jì)的 02e5c11fe78192bade873abe25ef5794.webp我定義了一個(gè)ConnectionSubscriber的接口,大家可以理解為我們的服務(wù)實(shí)例要去訂閱監(jiān)聽其他服務(wù)發(fā)送的消息同時(shí)提供了默認(rèn)實(shí)現(xiàn),就是基于自身的協(xié)議進(jìn)行連接和消息的發(fā)送當(dāng)然也能夠靈活的支持其他方式,只需要自定義一個(gè)ConnectionSubscriber就可以了,如果使用MQ的方式就可以實(shí)現(xiàn)一個(gè)MQConnectionSubscriber或者使用HTTP就可以實(shí)現(xiàn)一個(gè)HTTPConnectionSubscriber只不過使用自身的協(xié)議就可以不用依賴其他的庫(kù)或是中間件了,當(dāng)然如果你對(duì)消息的丟失率有比較嚴(yán)格的要求也可以使用MQ作為消息轉(zhuǎn)發(fā)的中介,而以我之前參與過的項(xiàng)目來說,一般普通的WebSocket場(chǎng)景基本上還是能忍受一定的丟失率的

        獲取服務(wù)實(shí)例信息

        那么我們?cè)趺粗酪ミB接哪些實(shí)例呢我定義了一個(gè)ConnectionServerManager的接口用來管理服務(wù)信息當(dāng)然我們完全可以自己實(shí)現(xiàn)一個(gè),比如通過配置文件來配置服務(wù)實(shí)例信息不過我們有更方便的方式,那就是依賴Spring Cloud的服務(wù)發(fā)現(xiàn)組件了,不管是Eureka還是Nacos還是其他的注冊(cè)中心相當(dāng)于都支持了,這就是抽象的魅力啊我們可以通過DiscoveryClient#getInstances(Registration.getServiceId())來獲得所有的實(shí)例,排除掉自身就是需要連接的服務(wù)實(shí)例了當(dāng)我們的服務(wù)實(shí)例連接上其他的服務(wù)實(shí)例之后,發(fā)送一個(gè)自身實(shí)例信息的消息過去,其他的服務(wù)實(shí)例接收到對(duì)應(yīng)的消息之后反過來連接我們的服務(wù)實(shí)例,保證一定的連接及時(shí)性,這樣雙方的連接就搭建起來了,可以互相轉(zhuǎn)發(fā)消息了同時(shí)我還添加了心跳檢測(cè)和自動(dòng)重連,當(dāng)一段時(shí)間沒有收到心跳回復(fù)后就會(huì)斷開連接,并且每隔一段時(shí)間就會(huì)重新查詢一遍實(shí)例信息,如果發(fā)現(xiàn)存在某個(gè)服務(wù)實(shí)例沒有對(duì)應(yīng)的連接,就會(huì)重新進(jìn)行連接,這樣就能在某些偶爾網(wǎng)絡(luò)不好的情況下有一定的容錯(cuò)到目前為止,我們基本的框架已經(jīng)建立了,當(dāng)我們啟動(dòng)服務(wù)之后,服務(wù)間就會(huì)自動(dòng)建立連接?

        連接區(qū)分和管理

        基于上述的思路,我們肯定需要區(qū)分真實(shí)的客戶端和用來轉(zhuǎn)發(fā)的客戶端于是我就把這些連接做了一個(gè)分類 53f083d8f199c9723b499b36493163e8.webp然后對(duì)于這些連接進(jìn)行一個(gè)統(tǒng)一的管理 2c3c5f6107a97b573c74fbf315113962.webp通過連接工廠ConnectionFactory我們可以將任意的連接適配成Connection對(duì)象,并實(shí)現(xiàn)各種連接間的消息轉(zhuǎn)發(fā)每個(gè)連接都會(huì)配置一個(gè)MessageEncoder和MessageDecoder用于消息的編碼和解碼,而且不同類別的連接對(duì)應(yīng)的編碼器和解碼器肯定是不一樣的,比如轉(zhuǎn)發(fā)的消息和發(fā)給真實(shí)客戶端的消息很大程度上都是有區(qū)別的,所以額外定義了一個(gè)MessageCodecAdapter用來適配不同類型的編解碼器,也能讓大家在自定義時(shí)方便管理

        消息發(fā)送

        現(xiàn)在當(dāng)我們發(fā)送某條消息之后,消息就會(huì)被轉(zhuǎn)發(fā)到其他的服務(wù)實(shí)例,所有的客戶端就都能收到了不對(duì)啊,在有些情況下我們不想讓所有客戶端都收到啊,能不能我們想讓誰收到就讓誰收到啊真麻煩,來,我把所有的連接都給你,你自己選吧

        連接選擇

        我們需要在消息發(fā)送時(shí)確定發(fā)送給哪些連接 14f5361d6fb5bcf930f75c7a6a7f1d63.webp于是我就定義了一個(gè)連接選擇器ConnectionSelector每次要發(fā)送消息的時(shí)候,我都會(huì)匹配一個(gè)連接選擇器,然后通過選擇器來獲得需要發(fā)送消息的連接,而我們可以通過自定義連接選擇器來實(shí)現(xiàn)我們消息的精準(zhǔn)發(fā)送這里其實(shí)就是我為什么會(huì)取名WebSocketLoadBalanceConcept的原因,為什么要叫LoadBalance呢Ribbon通過IRule來選擇一個(gè)Server我通過ConnectionSelector來選擇一個(gè)Connection集合是不是有異曲同工之妙繼續(xù)來說自定義選擇器準(zhǔn)備工作:
        • 我們的Connection有一個(gè)metadata字段用于存放自定義屬性

        • 我們的Message有一個(gè)headers字段用于存放消息頭

        給指定用戶發(fā)送消息 很多場(chǎng)景下我們需要給指定的用戶發(fā)送消息首先當(dāng)客戶端連接上來時(shí),可以通過參數(shù)或者主動(dòng)發(fā)送一個(gè)消息將userId發(fā)給服務(wù)端,然后服務(wù)端將得到的userId存在Connection的metadata中接著我們給需要發(fā)送的Message添加一個(gè)header,將對(duì)應(yīng)的userId作為消息頭這樣我們就可以自定義一個(gè)連接選擇器通過判斷Message是否包含userId消息頭來作為匹配的條件,當(dāng)Message的headers中存在userId時(shí),對(duì)Connection中的metadata進(jìn)行userId的匹配來篩選需要發(fā)送消息的連接由于userId是唯一的,當(dāng)我們自身服務(wù)連上來的客戶端中已經(jīng)匹配到就不需要再轉(zhuǎn)發(fā)了,如果沒有匹配到就通過其他服務(wù)實(shí)例的客戶端進(jìn)行消息轉(zhuǎn)發(fā)庫(kù)中已經(jīng)實(shí)現(xiàn)了對(duì)應(yīng)的UserSelector和UserMessage,可以使用配置開啟并通過在連接路徑上添加userId參數(shù)來標(biāo)記用戶當(dāng)然我們也可以借用緩存來精確的判斷需不需要轉(zhuǎn)發(fā)或者是需要轉(zhuǎn)發(fā)給哪幾個(gè)服務(wù),把userId和服務(wù)的instanceId等一些具有唯一性的數(shù)據(jù)緩存在Redis中,當(dāng)給用戶發(fā)送消息時(shí),從Redis中獲得用戶對(duì)應(yīng)的服務(wù)實(shí)例的instanceId或是具有唯一性的數(shù)據(jù),如果經(jīng)過匹配就是當(dāng)前服務(wù)就可以直接下發(fā),如果是其他服務(wù)就轉(zhuǎn)發(fā)給那個(gè)對(duì)應(yīng)的服務(wù)就行了給指定路徑發(fā)送消息 還有一種場(chǎng)景也比較常見就是類似主題訂閱,如訂閱設(shè)備狀態(tài)更新的數(shù)據(jù),就要給每一個(gè)對(duì)應(yīng)路徑的連接發(fā)送消息了我們可以使用不同的路徑來表示不同主題,然后自定義一個(gè)連接選擇器來匹配連接的路徑和消息頭中指定的路徑當(dāng)然庫(kù)中也已經(jīng)實(shí)現(xiàn)了對(duì)應(yīng)的PathSelector和PathMessage,可以通過配置開啟

        結(jié)束

        最后請(qǐng)?jiān)试S我發(fā)表一點(diǎn)對(duì)于抽象的拙見抽象其實(shí)就和 “道生一,一生二,二生三,三生萬物” 一樣,根據(jù)你的頂級(jí)接口(也就是核心功能)不斷的向外展開,你的頂級(jí)接口就是道(狹義的來講)以這個(gè)庫(kù)為例,ConnectionLoadBalanceConcept就是這個(gè)庫(kù)的道,他的核心功能就是發(fā)送消息,至于怎么發(fā),發(fā)給誰,不確定,像是一個(gè)混沌的狀態(tài)那么什么是一,二,三呢,我們發(fā)送消息需要載體于是就有了Connection和Message,我們需要對(duì)Connection進(jìn)行管理于是就有了ConnectionRepository, 我們需要轉(zhuǎn)發(fā)消息于是就有了ConnectionSubscriber等等而萬物就像是具體的實(shí)現(xiàn),是能落實(shí)的,基于Spring Cloud服務(wù)發(fā)現(xiàn)的連接管理器DiscoveryConnectionServerManager,基于路徑的連接選擇器PathSelector,基于Reactive的WebSocket連接ReactiveWebSocketConnection就像是你創(chuàng)造的世界,不斷的衍生出各種各樣的規(guī)則,這些規(guī)則相輔相成,讓你的世界平穩(wěn)的運(yùn)行當(dāng)然你的世界也有可能存在bug,手動(dòng)狗頭。 <END>

        推薦閱讀:

        程序員看劇,如果看到編程場(chǎng)景,會(huì)暫停看代碼嗎?

        為什么 List 原生排序比 stream() 流效率更高?

            
                互聯(lián)網(wǎng)初中高級(jí)大廠面試題(9個(gè)G)
              
            

        內(nèi)容包含Java基礎(chǔ)、JavaWeb、MySQL性能優(yōu)化、JVM、鎖、百萬并發(fā)、消息隊(duì)列、高性能緩存、反射、Spring全家桶原理、微服務(wù)、Zookeeper......等技術(shù)棧!

        ?戳閱讀原文領(lǐng)?。?/span>                                   朕已閱  f5b61771d96638484da31f6b885f3701.webp

        瀏覽 131
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            欧美激情在线狂野欧美精品 | 日韩无码操逼 | 女超人h版成c人版在线观看 | 国产激情无码视频 | 男女交性45分钟视频 | 公交车的激情 | 男人j桶女人p | 日韩黄色网址 | 午夜寂寞少妇aa**毛片 | 黄色日比片 |