消息隊列常見問題總結(jié)和分析
點擊上方藍色字體,選擇“標星公眾號”
優(yōu)質(zhì)文章,第一時間送達
? 作者?|??九卷
來源 |? urlify.cn/BRjiIr
一、簡介
很久以前也寫過一篇關(guān)于消息隊列的文章,這里的文章,這篇文章是對消息隊列使用場景,以及一些模型做過一點介紹。
這篇文章將分析消息隊列常見問題。
消息隊列:利用高效可靠的消息傳遞機制進行與平臺無關(guān)的數(shù)據(jù)交流,并基于數(shù)據(jù)通信來進行分布式系統(tǒng)集成。
從定義看:它是一種數(shù)據(jù)交流平臺,也是數(shù)據(jù)通信平臺。
然而,數(shù)據(jù)通信我們可以用http,RPC來進行通信,這些與消息隊列有什么區(qū)別呢?
最大的區(qū)別就是同步和異步。http和RPC一般都是同步,而消息隊列是異步。
二、為什么要用消息隊列
1.解耦
雙方不在基于對方直接通信了,而是基于消息隊列來通信,通過MQ解耦了客戶端和服務(wù)端通信。處理數(shù)據(jù)的雙方關(guān)注的點不同了,比如說一個事務(wù),我們只關(guān)心核心流程,而需要依賴其他系統(tǒng)但不是那么重要的事情,有通知即可,不需要等待結(jié)果。這種消息模型,關(guān)心的是通知,而不在意處理過程。也可以用消息隊列。
上下游開發(fā)人員也可以基于消息隊列發(fā)送消息,而不需要同步的處理消息了。
2.異步處理
傳統(tǒng)的業(yè)務(wù)邏輯都是基于同步的方式進行處理的。而有了消息隊列,就可以把消息存放在MQ里,消息隊列的消費者就可以從消息隊列中獲取數(shù)據(jù)并進行處理。它不一定要實時處理,可以隔幾分鐘處理消息隊列里的數(shù)據(jù)。
3.削峰和流控
這里有點像計算機中的硬件,比如CPU和內(nèi)存,CPU運算速度比內(nèi)存高N個數(shù)量級,那怎么才能緩解兩者之間的差異?中間加一個緩存來緩解兩者速度的差異。
同理,MQ也可以起到這種作用。對于上下游軟件不同的處理速度的差異進行調(diào)節(jié)。
比如,我們常見的秒殺應(yīng)用,前端瞬間涌入成千上萬的請求,前端可以承受這么大的請求壓力,但是復雜的后端系統(tǒng),肯定會被壓垮,從而導致秒殺服務(wù)不可以用的情況。為了解決這種前后端處理速度不平衡的差異,導致的服務(wù)問題,可以引入消息隊列來調(diào)節(jié),用消息隊列來緩存用戶的請求,等待后端系統(tǒng)來消費。
上面就是消息隊列的主要功能,當然還有其他一些功能,比如消息廣播,最終一致性等。
使用MQ后的問題
當然使用了消息隊列,會增加系統(tǒng)的復雜性,一致性延遲,可用性降低等問題。
可用性降低是指系統(tǒng)可用性降低,如果MQ掛了,那么肯定會影響到整個系統(tǒng)了。
因為上下游系統(tǒng)可能都會與MQ交互。
三、什么時候引入MQ?
這個要看業(yè)務(wù)系統(tǒng)功能需求,一個是系統(tǒng)處理是否到達了瓶頸,需要消息隊列來緩解;
還有,業(yè)務(wù)系統(tǒng)一致性要求是不是特別高。通常業(yè)務(wù)系統(tǒng)不會要求那么高的一致性要求。當然一些高頻交易系統(tǒng),一致性要求特別高,就不適合用了。
引入任何一個新的軟件必然會增加原有系統(tǒng)的復雜性,還是要根據(jù)業(yè)務(wù)特性進行合理的選擇。
四、消息隊列常見問題
1.如何保證消息不被重復消費(怎么保證冪等)
為什么會重復消費
生產(chǎn)者:也就是客戶端,可能會重復推送一條數(shù)據(jù)到MQ中。有可能是客戶端超時重復推送,也有可能是網(wǎng)絡(luò)比較慢客戶端重復推送了數(shù)據(jù)到MQ中。
MQ:消費者消費完了一條數(shù)據(jù),發(fā)送ACK信息表示消費成功時,這時候,MQ突然掛了,導致MQ以為消費者還未消費該條消息,MQ恢復后再次推送了該條消息,導致重復消費。
消費者:與上面MQ掛掉情況類似,消費者已經(jīng)消費完了一條消息,正準備給MQ發(fā)送ACK消息但還未發(fā)送時,這時候消費者掛了,服務(wù)重啟后MQ以為消費者還沒有消費該條消息,再次推送該條消息。
怎么處理重復消費
每個消息都帶一個唯一的消息id。消費端保證不重復消費就可以了,即使生產(chǎn)端產(chǎn)生了重復的數(shù)據(jù),當然生產(chǎn)端也最好控制下重復數(shù)據(jù)。
消費端保證不重復消費:
通常方法都是存儲消費了的消息,然后判斷消息是否存在。
1.先保存在查詢
每次保存數(shù)據(jù)前,先查詢下,不存在就插入。這種是并發(fā)不高的情況下可以使用。
2.數(shù)據(jù)庫添加唯一約束條件
比如唯一索引
3.增加一個消息表
已經(jīng)消費的消息,把消息id插入到消息表里面。
為了保證高并發(fā),消息表可以用Redis來存。
2.如何處理消息丟失的問題
消息丟失的原因
生產(chǎn)者:生產(chǎn)者推送消息到MQ中,但是網(wǎng)絡(luò)出現(xiàn)了故障,比如網(wǎng)絡(luò)超時,網(wǎng)絡(luò)抖動,導致消息沒有推送到MQ中,在網(wǎng)絡(luò)中丟失了。又或者推送到MQ中了,但是這時候MQ內(nèi)部出錯導致消息丟失。
MQ:MQ自己內(nèi)部發(fā)生了錯誤,導致消息丟失。
消費者:有時處理消息的消費者處理不當,還沒等消息處理完,就給MQ發(fā)送確認信息,但是這時候消費者自身出問題,掛了,確認消息已經(jīng)發(fā)送給MQ告訴MQ自己已經(jīng)消費完了,導致消息丟失。
如何保證消息不丟失呢?下面談?wù)勥@方面的做法。
3.如何保證消息可靠性傳輸
整個消息從生產(chǎn)到消費一般分為三個階段:生產(chǎn)者-生產(chǎn)階段,MQ-存儲階段,消費者-消費階段
3.1 生產(chǎn)者-生產(chǎn)階段
在這個階段,一般通過請求確認機制,來保證消息可靠性傳輸。與TCP/IP協(xié)議里ACK機制有點像。
客戶端發(fā)送消息到消息隊列,消息隊列給客戶端一個確認響應(yīng),表示消息已經(jīng)收到,客戶端收到響應(yīng),表示一次正常消息發(fā)送完畢。
3.2 MQ-存儲階段
消息隊列給客戶端發(fā)送確認消息。存儲完成后,才發(fā)送確認消息。
3.3 消費者-消費階段
跟生產(chǎn)階段相同,消費完了,給消息隊列發(fā)送確認消息。
4.如何保證消息的順序性
我們?nèi)粘Uf的順序性是什么呢?
比如說小孩早上上學過程,他先起床,然后洗漱,吃早餐,最后上學。我們認為他做的事情是有先后順序的,及是時間的先后順序,我們用時間來標記他的順序。
更抽象的理解,這些發(fā)生的事件有一個相同的參考系,即他們的時間是對應(yīng)同一個物理時鐘的時間。
如果沒有絕對的時間作為參考系,那他們之間還能確定順序嗎?
如果事件之間有因果關(guān)系,比如A、B兩個事件是因果關(guān)系,那么A一定發(fā)生在B之前(前應(yīng)后果)。相反,在沒有一個絕對的時間的參考的情況下,若A、B之間沒有因果關(guān)系,那么A、B之間就沒有順序關(guān)系。跟java里的happen before很像。
總結(jié)一下,我們說順序時,其實說的是:
在有絕對時間作為參考系的情況下,事件發(fā)生的時間先后關(guān)系;
在沒有絕對時間作為參考系的情況下,一種由因果關(guān)系推斷出來的happening before的關(guān)系;
在分布式系統(tǒng)領(lǐng)域,有一篇關(guān)于時間,時鐘和事件的順序的很有名的一篇論文
Time, Clocks, and the Ordering of Events in a Distributed System,可以看一看,上面舉例情況都是參考這篇論文。
參考上面的結(jié)論,在消息隊列中,我們也是以時間作為參考系,讓消息有序。
但是,在消息隊列中,消息有序會遇到一些問題,下面讓我們來討論這些問題。
消息的順序性的一些問題
在計算機系統(tǒng)中,有一個比較棘手的問題是,它可以是多線程執(zhí)行的,而且哪個線程先運行,哪個線程后運行,完全是由操作系統(tǒng)決定的,完全沒有規(guī)律,是亂序執(zhí)行。顯然與消息隊列中的消息有序相悖。
還有,在消息隊列中,涉及到生產(chǎn)者,MQ,消費者,還有網(wǎng)絡(luò),這4者之間的關(guān)系。然后他們又涉及到消息的順序性,就有很多種情況需要考慮??梢詤⒖歼@篇文章
分布式開放消息系統(tǒng)(RocketMQ)的原理與實踐(作者:CHUAN.CHEN),各種情況討論的很全面。
最后的結(jié)論就是:消息的順序性,不僅僅是MQ本身存儲消息要保證順序性,還需要生產(chǎn)者和消費者一同來保證順序性。
順序性保證
在消息隊列中,消息的順序性需要3方面來保證:
1、生產(chǎn)者發(fā)送消息時要保證順序
2、消息被消息隊列存儲時要保持和發(fā)送的順序一致
3、消息被消費時保持和存儲的順序一致
生產(chǎn)者:發(fā)送時要求用戶在同一個線程中采用同步的方式發(fā)送。
消息隊列:存儲保持和發(fā)送的順序一致。一般是在一個分區(qū)中保持順序性。
消費者:一個分區(qū)的消息由一個線程來處理消費消息。
https://www.hicsc.com/post/2020041566 這個鏈接中,作者分析了RocketMQ順序消息的代碼實現(xiàn)。
5.消息隊列中消息延遲問題
你說的 消息的延遲 是延遲消息隊列嗎?啊,并不是,是完全2個不同的概念。延遲消息隊列是MQ提供的一個功能。消息的延遲,是指消費端消費的速度跟不上生產(chǎn)端產(chǎn)生消息的速度,可能導致消費端丟失數(shù)據(jù),也可能導致消息積壓在MQ中。所以這里說的消息的延遲,指的是消費端消費消息的延遲。
消息隊列的消費模型pull和push:
1、push模式
這種模式是消息隊列主動將消息推送給消費者。
優(yōu)點:盡可能實時的將消息發(fā)送給消費者進行消費。
缺點:如果消費端消費能力弱,消費端的消費速度趕不上生產(chǎn)端,而MQ又不斷的給消費端推送消息,消費端的緩存滿了導致緩存溢出,就會產(chǎn)生錯誤或丟失數(shù)據(jù)的可能。
2、pull模式
這種模式是由消費端主動向消息隊列拉取消息。
優(yōu)點:可以自主可控的拉取消息。
缺點:拉取消息的頻率不好控制。
a、如果每次pull時間間隔比較久,會增加消息延遲,消息到達消費者時間會加長。這樣時間一長會導致MQ中消息的堆積,而消息長時間堆積就會導致一系列的問題:
1、如果積壓了幾個小時的數(shù)據(jù),有幾千萬的數(shù)據(jù)量,消費端處理的壓力會越來越大。
2、如果是帶有過期時間的消息,可能這些消息已經(jīng)到了過期時間,因為積壓時間太長,但還沒被消費端消費掉,消費端來不及消費。
3、如果持續(xù)的積壓,達到了MQ能存儲消息數(shù)量的上限,也就是說MQ滿了,存不下了,會導致MQ丟掉數(shù)據(jù),導致數(shù)據(jù)丟失。
想一下,上面的情形是不是跟TCP/IP協(xié)議的流量控制和擁塞控制遇到的一些問題很像,也有很多不同。
b、如果每次pull的時間間隔比較短,在一段時間內(nèi)MQ中沒有可消費的消息,會產(chǎn)生很多無效的pull請求,導致一定的網(wǎng)絡(luò)開銷。
所以解決問題的辦法最主要就是優(yōu)化消費端的消費性能。1.優(yōu)化消費邏輯 2.水平擴容,增加消費端并發(fā)。
延遲問題處理
如果消息堆積已經(jīng)發(fā)生了,導致了上面的3個問題,這時怎么辦?
1、積壓了幾個小時幾千萬的數(shù)據(jù)
第一:肯定要找到積壓數(shù)據(jù)的原因,一般都是消費端的問題。
第二:如果可以的,擴大消費端的數(shù)量,快速消費掉消息。
第三:擴容,增加多機器消費。新建一個topic,partition是原來10倍,建立原先10倍的queue。然后寫一個臨時的消費程序,這個消費程序去轉(zhuǎn)移積壓的數(shù)據(jù),把積壓的數(shù)據(jù)均勻輪詢寫入建立好的10倍數(shù)量的queue。然后在征用10倍機器的消費端來消費這個queue。這種做法相當于臨時將 queue 資源和 consumer 資源擴大 10 倍,以正常的 10 倍速度來消費數(shù)據(jù)。消費完了,恢復原來的部署。這是大廠做法。
2、積壓時間過長,帶有過期時間的消息過期失效了
這個沒有好的辦法處理,只能通過程序找出丟失的數(shù)據(jù),然后也是通過程序把丟失的數(shù)據(jù)重新導入到MQ里,重新消費。
3、長時間積壓倒是MQ寫滿了
這個也沒啥好辦法處理,只能快速消費掉MQ里的數(shù)據(jù),快速消費指消費一個,丟掉一個,不要這些數(shù)據(jù)了,然后重新導入數(shù)據(jù)。用戶少的時候在補回數(shù)據(jù)。
6.消息隊列高可用
6.1 kafka
kafka基本架構(gòu):
Broker:一個kafka節(jié)點就是一個broker,多個broker組成一個kafka集群。一個broker可以是一個單機器kafka服務(wù)器。
Topic:存放消息的主題,相當于一個隊列??梢岳斫鉃榇娣畔⒌姆诸?,比如你可以有前端日志的Topic,后端日志的Topic??梢岳斫鉃镸ySQL里的表。
Partition:一個topic可以劃分為多個partition,每個partition都是一個有序隊列。把topic主題中的消息進行分拆,均攤到kafka集群中不同機器上。partition是topic的進一步拆分。
Replica:副本消息。kafka可以以partition為單位,保存多個副本,分散在不同的broker上。副本數(shù)是可以設(shè)置的。
Segment: 一個Partition被切分為多個Segment,每個Segment包含索引文件和數(shù)據(jù)文件。
Message:kafka里最基本消息單元。

一個kafka集群可以由多個broker組成,每個broker是一個節(jié)點,你創(chuàng)建一個topic,這個topic可以劃分為多個partition,每個partition可以存儲在不同的broker上,每個partition存放一部分數(shù)據(jù)。
6.2 RocketMQ
在 RocketMQ 4.5 版本之前,RocketMQ 只有 Master/Slave 一種部署方式來實現(xiàn)高可用。
一組 Broker 中有一個 Master,有零到多個 Slave,Slave 通過同步復制或異步復制方式去同步 Master 的數(shù)據(jù)。Master/Slave 部署模式,提供了一定的高可用性。
上面主從高可用架構(gòu)有一個缺點:
主節(jié)點掛了后需要人為的進行重啟或者切換。為了解決這個問題,后續(xù)引入了raft,用raft協(xié)議來完成自動選主。RocketMQ的DLedger 就是一個基于 raft 協(xié)議的 commitlog 存儲庫,也是 RocketMQ 實現(xiàn)新的高可用多副本架構(gòu)的關(guān)鍵。
還可以多master多slave部署,防止單點故障。
五、參考
https://www.hicsc.com/post/2020041566
https://tech.meituan.com/2016/07/01/mq-design.html
https://lamport.azurewebsites.net/pubs/time-clocks.pdf
https://rocketmq.apache.org/docs/quick-start/
https://github.com/apache/rocketmq
https://github.com/apache/rocketmq/tree/master/docs/cn
https://github.com/apache/kafka
https://kafka.apache.org/documentation/#gettingStarted
https://bug-free.cn/2020/01/09/Kafka-架構(gòu)設(shè)計/
粉絲福利:Java從入門到入土學習路線圖
???

?長按上方微信二維碼?2 秒
感謝點贊支持下哈?
