【多人聊天室】WebSocket集群/分布式改造

本文公眾號來源:Rude3Knife?
作者:蠻三刀把刀
前言書接上文,我們開始對我們的小小聊天室進(jìn)行集群化改造。
本文內(nèi)容摘要:
為何要改造為分布式集群
如何改造為分布式集群
用戶在聊天室集群如何發(fā)消息
用戶在聊天室集群如何接收消息
補(bǔ)充知識點:STOMP 簡介
功能一:向聊天室集群中的全體用戶發(fā)消息——Redis的訂閱/發(fā)布
功能二:集群集群用戶上下線通知——Redis訂閱發(fā)布
功能三:集群用戶信息維護(hù)——Redis集合
WebSocket集群還有哪些可能性
本文源碼:(媽媽再也不用擔(dān)心我無法復(fù)現(xiàn)文章代碼啦)
https://github.com/qqxx6661/springboot-websocket-demo/releases/tag/%E9%9B%86%E7%BE%A4%E7%89%88
正文WebSocket集群/分布式改造:實現(xiàn)多人在線聊天室
為何要改造為分布式集群
分布式就是為了解決單點故障問題,想象一下,如果一個服務(wù)器承載了1000個大佬同時聊天,服務(wù)器突然掛了,1000個大佬瞬間全部掉線,大概明天你就被大佬們吊起來打了。
當(dāng)聊天室改為集群后,就算服務(wù)器A掛了,服務(wù)器B上聊天的大佬們還可以愉快的聊天,并且在前端還能通過代碼,讓連接A的大佬們快速重連至存活的服務(wù)器B,繼續(xù)和大家愉快的聊天,豈不美哉!
總結(jié)一下:實現(xiàn)了分布式WebSocket后,我們可以將流量負(fù)載均衡到不同的服務(wù)器上并提供一種通信機(jī)制讓各個服務(wù)器能進(jìn)行消息同步(不然用戶A連上服務(wù)器A,用戶B臉上服務(wù)器B,它們發(fā)消息的時候?qū)Ψ蕉紱]法收到)。
如何改造為分布式集群
當(dāng)我們要實現(xiàn)分布式的時候,我們則需要在各個機(jī)器上共享這些信息,所以我們需要一個Publish/Subscribe的中間件。我們現(xiàn)在使用Redis作為我們的解決方案。
1. 用戶在聊天室集群如何發(fā)消息
假設(shè)我們的聊天室集群有服務(wù)器A和B,用戶Alice連接在A上,Bob連接在B上、
Alice向聊天室的服務(wù)器A發(fā)送消息,A服務(wù)器必須要將收到的消息轉(zhuǎn)發(fā)到Redis,才能保證聊天室集群的所有服務(wù)器(也就是A和B)能夠拿到消息。否則,只有Alice在的服務(wù)器A能夠讀到消息,用戶Bob在的服務(wù)器B并不能收到消息,A和B也就無法聊天了。
2. 用戶在聊天室集群如何接收消息
說完了發(fā)送消息,那么如何保證Alice發(fā)的消息,其他所有人都能收到呢,前面我們知道了Alice發(fā)送的消息已經(jīng)被傳到了Redis的頻道,那么所有服務(wù)器都必須訂閱這個Redis頻道,然后把這個頻道的消息轉(zhuǎn)發(fā)到自己的用戶那里,這樣自己服務(wù)器所管轄的用戶就能收到消息。
補(bǔ)充知識點:STOMP 簡介
上期我們搭建了個websocket聊天室demo,并且使用了STOMP協(xié)議,但是我并沒有介紹到底什么是STOMP協(xié)議,同學(xué)們會有疑惑,這里對于STOMP有很好地總結(jié):
當(dāng)直接使用WebSocket時(或SockJS)就很類似于使用TCP套接字來編寫Web應(yīng)用。因為沒有高層級的線路協(xié)議(wire protocol),因此就需要我們定義應(yīng)用之間所發(fā)送消息的語義,還需要確保連接的兩端都能遵循這些語義。
就像HTTP在TCP套接字之上添加了請求-響應(yīng)模型層一樣,STOMP在WebSocket之上提供了一個基于幀的線路格式(frame-based wire format)層,用來定義消息的語義。
與HTTP請求和響應(yīng)類似,STOMP幀由命令、一個或多個頭信息以及負(fù)載所組成。例如,如下就是發(fā)送數(shù)據(jù)的一個STOMP幀:
>>>?SEND
transaction:tx-0
destination:/app/marco
content-length:20
{"message":"Marco!"}
好了,介紹完了概念,讓我們開始動手改造!
功能一:向聊天室集群中的全體用戶發(fā)消息——Redis的訂閱/發(fā)布
如果你不熟悉Redis的sub/pub(訂閱/發(fā)布)功能,請看這里進(jìn)行簡單了解它的用法,很簡單:
https://redisbook.readthedocs.io/en/latest/feature/pubsub.html
在我們上篇文章的Demo基礎(chǔ)上,我們進(jìn)行集群改造。上一篇文章的源碼見下方:
https://github.com/qqxx6661/springboot-websocket-demo/releases/tag/%E5%8D%95%E6%9C%BA%E7%89%88
1. 添加Redis依賴pom
<dependency>
????<groupId>org.springframework.bootgroupId>
????<artifactId>spring-boot-starter-data-redisartifactId>
dependency>
2. application.properties新增redis配置
當(dāng)然首先要確保你安裝了Redis,windows下安裝redis比較麻煩,你可以搜索redis-for-windows下載安裝。
#?redis?連接配置
spring.redis.database=0
spring.redis.host=127.0.0.1
spring.redis.password=
spring.redis.port=6379
spring.redis.ssl=false
#?空閑連接最大數(shù)
spring.redis.jedis.pool.max-idle=10
#?獲取連接最大等待時間(s)
spring.redis.jedis.pool.max-wait=60000
3. 在application.properties添加頻道名定義
#?Redis定義
redis.channel.msgToAll?=?websocket.msgToAll
4. 新建redis/RedisListenerBean
package?cn.monitor4all.springbootwebsocketdemo.redis;
import?org.slf4j.Logger;
import?org.slf4j.LoggerFactory;
import?org.springframework.beans.factory.annotation.Value;
import?org.springframework.context.annotation.Bean;
import?org.springframework.data.redis.connection.RedisConnectionFactory;
import?org.springframework.data.redis.listener.PatternTopic;
import?org.springframework.data.redis.listener.RedisMessageListenerContainer;
import?org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import?org.springframework.stereotype.Component;
import?java.net.Inet4Address;
import?java.net.InetAddress;
/**
?*?Redis訂閱頻道屬性類
?*?@author?yangzhendong01
?*/
@Component
public?class?RedisListenerBean?{
????private?static?final?Logger?LOGGER?=?LoggerFactory.getLogger(RedisListenerBean.class);
????@Value("${server.port}")
????private?String?serverPort;
????@Value("${redis.channel.msgToAll}")
????private?String?msgToAll;
????/**
?????*?redis消息監(jiān)聽器容器
?????*?可以添加多個監(jiān)聽不同話題的redis監(jiān)聽器,只需要把消息監(jiān)聽器和相應(yīng)的消息訂閱處理器綁定,該消息監(jiān)聽器
?????*?通過反射技術(shù)調(diào)用消息訂閱處理器的相關(guān)方法進(jìn)行一些業(yè)務(wù)處理
?????*?@param?connectionFactory
?????*?@param?listenerAdapter
?????*?@return
?????*/
????@Bean
????RedisMessageListenerContainer?container(RedisConnectionFactory?connectionFactory,?MessageListenerAdapter?listenerAdapter)?{
????????RedisMessageListenerContainer?container?=?new?RedisMessageListenerContainer();
????????container.setConnectionFactory(connectionFactory);
????????//?監(jiān)聽msgToAll
????????container.addMessageListener(listenerAdapter,?new?PatternTopic(msgToAll));
????????LOGGER.info("Subscribed?Redis?channel:?"?+?msgToAll);
????????return?container;
????}
}
可以看到,我們在代碼里監(jiān)聽了redis頻道m(xù)sgToAll,這個是在application.properties定義的,當(dāng)然如果你懶得定義,這里可以寫死。
5. 聊天室集群:發(fā)消息改造
我們單機(jī)聊天室的發(fā)送消息Controller是這樣的:
@MessageMapping("/chat.sendMessage")
@SendTo("/topic/public")
????public?ChatMessage?sendMessage(@Payload?ChatMessage?chatMessage)?{
????????return?chatMessage;
我們前端發(fā)給我們消息后,直接給/topic/public轉(zhuǎn)發(fā)這個消息,讓其他用戶收到。
在集群中,我們需要把消息轉(zhuǎn)發(fā)給Redis,并且不轉(zhuǎn)發(fā)給前端,而是讓服務(wù)端監(jiān)聽Redis消息,在進(jìn)行消息發(fā)送。
將Controller改為:
@Value("${redis.channel.msgToAll}")
private?String?msgToAll;
@Autowired
private?RedisTemplate?redisTemplate;
@MessageMapping("/chat.sendMessage")
????public?void?sendMessage(@Payload?ChatMessage?chatMessage)?{
????????try?{
????????????redisTemplate.convertAndSend(msgToAll,?JsonUtil.parseObjToJson(chatMessage));
????????}?catch?(Exception?e)?{
????????????LOGGER.error(e.getMessage(),?e);
????????}
????}
你會發(fā)現(xiàn)我們在代碼中使用了JsonUtil將實體類ChatMessage轉(zhuǎn)為了Json發(fā)送給了Redis,這個Json工具類需要使用到FaskJson依賴:
pom添加FastJson依賴
<dependency>
????<groupId>com.alibabagroupId>
????<artifactId>fastjsonartifactId>
????<version>1.2.58version>
dependency>
添加Json解析工具類JsonUtil,提供對象轉(zhuǎn)Json,Json轉(zhuǎn)對象的能力
package?cn.monitor4all.springbootwebsocketdemo.util;
import?com.alibaba.fastjson.JSON;
import?com.alibaba.fastjson.JSONObject;
import?org.slf4j.Logger;
import?org.slf4j.LoggerFactory;
/**
?*?JSON?轉(zhuǎn)換
?*/
public?final?class?JsonUtil?{
????private?static?final?Logger?LOGGER?=?LoggerFactory.getLogger(JsonUtil.class);
????/**
?????*?把Java對象轉(zhuǎn)換成json字符串
?????*
?????*?@param?object?待轉(zhuǎn)化為JSON字符串的Java對象
?????*?@return?json?串?or?null
?????*/
????public?static?String?parseObjToJson(Object?object)?{
????????String?string?=?null;
????????try?{
????????????string?=?JSONObject.toJSONString(object);
????????}?catch?(Exception?e)?{
????????????LOGGER.error(e.getMessage());
????????}
????????return?string;
????}
????/**
?????*?將Json字符串信息轉(zhuǎn)換成對應(yīng)的Java對象
?????*
?????*?@param?json?json字符串對象
?????*?@param?c????對應(yīng)的類型
?????*/
????public?static??T?parseJsonToObj(String?json,?Class?c)?{
????????try?{
????????????JSONObject?jsonObject?=?JSON.parseObject(json);
????????????return?JSON.toJavaObject(jsonObject,?c);
????????}?catch?(Exception?e)?{
????????????LOGGER.error(e.getMessage());
????????}
????????return?null;
????}
}
這樣,我們接收到用戶發(fā)送消息的請求時,就將消息轉(zhuǎn)發(fā)給了redis的頻道websocket.msgToAll
6. 聊天室集群:接收消息改造
單機(jī)的聊天室,我們接收消息是通過Controller直接把消息轉(zhuǎn)發(fā)到所有人的頻道上,這樣就能在所有人的聊天框顯示。
在集群中,我們需要服務(wù)器把消息從Redis中拿出來,并且推送到自己管的用戶那邊,我們在Service層實現(xiàn)消息的推送。
在處理消息之后發(fā)送消息:
正如前面看到的那樣,使用 @MessageMapping 或者 @SubscribeMapping 注解可以處理客戶端發(fā)送過來的消息,并選擇方法是否有返回值。
如果 @MessageMapping注解的控制器方法有返回值的話,返回值會被發(fā)送到消息代理,只不過會添加上"/topic"前綴??梢允褂聾SendTo 重寫消息目的地;
如果 @SubscribeMapping注解的控制器方法有返回值的話,返回值會直接發(fā)送到客戶端,不經(jīng)過代理。如果加上@SendTo 注解的話,則要經(jīng)過消息代理。在應(yīng)用的任意地方發(fā)送消息:
spring-websocket 定義了一個 SimpMessageSendingOperations 接口(或者使用SimpMessagingTemplate ),可以實現(xiàn)自由的向任意目的地發(fā)送消息,并且訂閱此目的地的所有用戶都能收到消息。
我們在service實現(xiàn)發(fā)送,需要使用上述第二種方法。
新建類service/ChatService:
package?cn.monitor4all.springbootwebsocketdemo.service;
import?cn.monitor4all.springbootwebsocketdemo.model.ChatMessage;
import?org.slf4j.Logger;
import?org.slf4j.LoggerFactory;
import?org.springframework.beans.factory.annotation.Autowired;
import?org.springframework.messaging.handler.annotation.Payload;
import?org.springframework.messaging.simp.SimpMessageSendingOperations;
import?org.springframework.stereotype.Service;
@Service
public?class?ChatService?{
????private?static?final?Logger?LOGGER?=?LoggerFactory.getLogger(ChatService.class);
????@Autowired
????private?SimpMessageSendingOperations?simpMessageSendingOperations;
????public?void?sendMsg(@Payload?ChatMessage?chatMessage)?{
????????LOGGER.info("Send?msg?by?simpMessageSendingOperations:"?+?chatMessage.toString());
????????simpMessageSendingOperations.convertAndSend("/topic/public",?chatMessage);
????}
}
我們在哪里調(diào)用這個service呢,我們需要在監(jiān)聽到消息后調(diào)用,所以我們就要有下面的Redis監(jiān)聽消息處理專用類
新建類redis/RedisListenerHandle:
package?cn.monitor4all.springbootwebsocketdemo.redis;
import?cn.monitor4all.springbootwebsocketdemo.model.ChatMessage;
import?cn.monitor4all.springbootwebsocketdemo.service.ChatService;
import?cn.monitor4all.springbootwebsocketdemo.util.JsonUtil;
import?org.slf4j.Logger;
import?org.slf4j.LoggerFactory;
import?org.springframework.beans.factory.annotation.Autowired;
import?org.springframework.beans.factory.annotation.Value;
import?org.springframework.data.redis.connection.Message;
import?org.springframework.data.redis.core.RedisTemplate;
import?org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import?org.springframework.stereotype.Component;
/**
?*?Redis訂閱頻道處理類
?*?@author?yangzhendong01
?*/
@Component
public?class?RedisListenerHandle?extends?MessageListenerAdapter?{
????private?static?final?Logger?LOGGER?=?LoggerFactory.getLogger(RedisListenerHandle.class);
????@Value("${redis.channel.msgToAll}")
????private?String?msgToAll;
????@Value("${server.port}")
????private?String?serverPort;
????@Autowired
????private?RedisTemplate?redisTemplate;
????@Autowired
????private?ChatService?chatService;
????/**
?????*?收到監(jiān)聽消息
?????*?@param?message
?????*?@param?bytes
?????*/
????@Override
????public?void?onMessage(Message?message,?byte[]?bytes)?{
????????byte[]?body?=?message.getBody();
????????byte[]?channel?=?message.getChannel();
????????String?rawMsg;
????????String?topic;
????????try?{
????????????rawMsg?=?redisTemplate.getStringSerializer().deserialize(body);
????????????topic?=?redisTemplate.getStringSerializer().deserialize(channel);
????????????LOGGER.info("Received?raw?message?from?topic:"?+?topic?+?", raw message content:"?+?rawMsg);
????????}?catch?(Exception?e)?{
????????????LOGGER.error(e.getMessage(),?e);
????????????return;
????????}
????????if?(msgToAll.equals(topic))?{
????????????LOGGER.info("Send?message?to?all?users:"?+?rawMsg);
????????????ChatMessage?chatMessage?=?JsonUtil.parseJsonToObj(rawMsg,?ChatMessage.class);
????????????//?發(fā)送消息給所有在線Cid
????????????chatService.sendMsg(chatMessage);
????????}?else?{
????????????LOGGER.warn("No?further?operation?with?this?topic!");
????????}
????}
}
7. 看看效果
這樣,我們的改造就基本完成了!我們看一下效果
我們將服務(wù)器運(yùn)行在8080上,然后打開localhost:8080,起名Alice進(jìn)入聊天室
隨后,我們在application.properties中將端口server.port=8081
再次運(yùn)行程序(別忘了開啟IDEA的“允許啟動多個并行服務(wù)”設(shè)置,不然會覆蓋掉你的8080服務(wù),如下圖),在8081啟動一個聊天室,起名Bob進(jìn)入聊天室。
image如下兩圖,我們已經(jīng)可以在不同端口的兩個聊天室,互相聊天了?。ㄗ⒁饪磚rl)
image
image在互相發(fā)送消息是,我們還可以使用命令行監(jiān)聽下Redis的頻道websocket.msgToAll,可以看到雙方傳送的消息。如下圖:
image我們還可以打開Chrome的F12控制臺,查看前端的控制臺發(fā)送消息的log,如下圖:
image大功告成了嗎?
功能實現(xiàn)了,但是并不完美!你會發(fā)現(xiàn),Bob的加入并沒有提醒Bob進(jìn)入了聊天室(在單機(jī)版是有的),這是因為我們在“加入聊天室”的代碼還沒有修改,在加入時,只有Bob的服務(wù)器B里的其他用戶知道Bob加入了聊天室。我們還能再進(jìn)一步!
功能二/功能三:集群用戶上下線通知,集群用戶信息存儲
我們需要彌補(bǔ)上面的不足,將用戶上線下線的廣播發(fā)送到所有服務(wù)器上。
此外,我還希望以后能夠查詢集群中所有的在線用戶,我們在redis中添加一個set,來保存用戶名,這樣就可以隨時得到在線用戶的數(shù)量和名稱。
1. 在application.properties添加頻道名定義
#?Redis定義
redis.channel.userStatus?=?websocket.userStatus
redis.set.onlineUsers?=?websocket.onlineUsers
我們增加兩個定義
第一個是新增redis頻道websocket.userStatus用來廣播用戶上下線消息
第二個是redis的set,用來保存在線用戶信息
2. 在RedisListenerBean添加新頻道監(jiān)聽
container.addMessageListener(listenerAdapter,?new?PatternTopic(userStatus));
3. 在ChatService中添加
public?void?alertUserStatus(@Payload?ChatMessage?chatMessage)?{
????????LOGGER.info("Alert?user?online?by?simpMessageSendingOperations:"?+?chatMessage.toString());
????????simpMessageSendingOperations.convertAndSend("/topic/public",?chatMessage);
????}
在service中我們向本服務(wù)器的用戶廣播消息,用戶上線或者下線的消息都通過這里傳達(dá)。
4. 修改ChatController中的addUser方法
@MessageMapping("/chat.addUser")
????public?void?addUser(@Payload?ChatMessage?chatMessage,?SimpMessageHeaderAccessor?headerAccessor)?{
????????LOGGER.info("User?added?in?Chatroom:"?+?chatMessage.getSender());
????????try?{
????????????headerAccessor.getSessionAttributes().put("username",?chatMessage.getSender());
????????????redisTemplate.opsForSet().add(onlineUsers,?chatMessage.getSender());
????????????redisTemplate.convertAndSend(userStatus,?JsonUtil.parseObjToJson(chatMessage));
????????}?catch?(Exception?e)?{
????????????LOGGER.error(e.getMessage(),?e);
????????}
????}
我們修改了addUser方法,在這里往redis中廣播用戶上線的消息,并把用戶名username寫入redis的set中(websocket.onlineUsers)
5. 修改WebSocketEventListener中的handleWebSocketDisconnectListener方法
@EventListener
????public?void?handleWebSocketDisconnectListener(SessionDisconnectEvent?event)?{
????????StompHeaderAccessor?headerAccessor?=?StompHeaderAccessor.wrap(event.getMessage());
????????String?username?=?(String)?headerAccessor.getSessionAttributes().get("username");
????????if(username?!=?null)?{
????????????LOGGER.info("User?Disconnected?:?"?+?username);
????????????ChatMessage?chatMessage?=?new?ChatMessage();
????????????chatMessage.setType(ChatMessage.MessageType.LEAVE);
????????????chatMessage.setSender(username);
????????????try?{
????????????????redisTemplate.opsForSet().remove(onlineUsers,?username);
????????????????redisTemplate.convertAndSend(userStatus,?JsonUtil.parseObjToJson(chatMessage));
????????????}?catch?(Exception?e)?{
????????????????LOGGER.error(e.getMessage(),?e);
????????????}
????????}
????}
在用戶關(guān)閉網(wǎng)頁時,websocket會調(diào)用該方法,我們在這里需要把用戶從redis的在線用戶set里刪除,并且向集群發(fā)送廣播,說明該用戶退出聊天室。
6. 修改Redis監(jiān)聽類RedisListenerHandle
?else?if?(userStatus.equals(topic))?{
????????????ChatMessage?chatMessage?=?JsonUtil.parseJsonToObj(rawMsg,?ChatMessage.class);
????????????if?(chatMessage?!=?null)?{
????????????????chatService.alertUserStatus(chatMessage);
????????????}
在監(jiān)聽類中我們接受了來自userStatus頻道的消息,并調(diào)用service
7. 看看效果
image
image此外,我們還可以在Reids中查詢到用戶信息:
imageWebSocket集群還有哪些可能性
有了這兩篇文章的基礎(chǔ), 我們當(dāng)然還能實現(xiàn)以下的功能:
某用戶A單獨(dú)私信給某用戶B,或者私信給某用戶群(用戶B和C)
系統(tǒng)提供外部調(diào)用接口,給指定用戶/用戶群發(fā)送消息,實現(xiàn)消息推送
系統(tǒng)提供外部接口,實時獲取用戶數(shù)據(jù)(人數(shù)/用戶信息)
感興趣的同學(xué)可以自己試試看。
總結(jié)我們在本文中把單機(jī)版的聊天室改為了分布式聊天室,大大提高了聊天室可用性。
本文工程源代碼:
單機(jī)版:
https://github.com/qqxx6661/springboot-websocket-demo/releases/tag/%E5%8D%95%E6%9C%BA%E7%89%88
集群版:
https://github.com/qqxx6661/springboot-websocket-demo/releases/tag/%E9%9B%86%E7%BE%A4%E7%89%88
公眾號文章導(dǎo)航:公眾號所有的文章導(dǎo)航
長按掃碼可關(guān)注獲取?
歡迎關(guān)注
點個再看
