你管這破玩意兒叫 MQ?
幸福的煩惱
張大胖最近是又喜又憂,喜的是業(yè)務(wù)量發(fā)展猛增,憂的是由于業(yè)務(wù)量猛增,一些原來(lái)不是問(wèn)題的問(wèn)題變成了大問(wèn)題,比如說(shuō)新會(huì)員注冊(cè)吧,原來(lái)注冊(cè)成功只要發(fā)個(gè)短信就行了,但隨著業(yè)務(wù)的發(fā)展,現(xiàn)在注冊(cè)成功也需要發(fā) push,發(fā)優(yōu)惠券,…等

這樣光注冊(cè)用戶這一步就需要調(diào)用很多服務(wù),導(dǎo)致用戶注冊(cè)都需要花不少時(shí)間,假設(shè)每個(gè)服務(wù)調(diào)用需要 50 ms,那么光以上服務(wù)就需要調(diào)用 200 ms,而且后續(xù)產(chǎn)品還有可能再加一些發(fā)新人紅包等活動(dòng),每加一個(gè)功能,除了引入額外的服務(wù)增加耗時(shí)外,還需要額外集成服務(wù),重發(fā)代碼,實(shí)在讓人煩不勝煩,張大胖想一勞永逸地解決這個(gè)問(wèn)題,于是找了 CTO Bill 來(lái)商量一下,看能否提供一些思路
Bill 一眼就看出了問(wèn)題的所在:你這個(gè)系統(tǒng)存在三個(gè)問(wèn)題:同步,耦合,流量暴增時(shí)系統(tǒng)被壓垮的風(fēng)險(xiǎn)
同步: 我們可以看到在注冊(cè)用戶后,需要同步調(diào)用其他模塊后才能返回,這是耗時(shí)高的根本原因!耦合:注冊(cè)用戶與其他模塊嚴(yán)重耦合,體現(xiàn)在每調(diào)用一個(gè)模塊,都需要在注冊(cè)用戶代碼處集成其他模塊的代碼并重新發(fā)布,此時(shí)在這些流程中只有注冊(cè)用戶這一步是核心流程,其他都是次要流程,核心流程應(yīng)該與次要流程解耦,否則只要其中一個(gè)次要流程調(diào)用失敗,整個(gè)流程也就失敗了,體現(xiàn)在前端就是明明已經(jīng)注冊(cè)成功了,但返回給用戶的卻是失敗的流量暴增風(fēng)險(xiǎn):如果某天運(yùn)營(yíng)搞活動(dòng),比如注冊(cè)后送新人紅包,那么很有可能導(dǎo)致用戶注冊(cè)的流量暴增,那么由于我們的注冊(cè)用戶流程過(guò)長(zhǎng),很有可能導(dǎo)致注冊(cè)用戶的服務(wù)無(wú)法承載相應(yīng)的流量壓力而導(dǎo)致系統(tǒng)雪崩
不愧是 CTO,一眼看出問(wèn)題所在,「那該怎么解決呢」張大胖問(wèn)到
「大胖,你應(yīng)該聽(tīng)說(shuō)過(guò)一句話:任何軟件問(wèn)題都可以通過(guò)添加一層中間層來(lái)解決,如果不能,那就再加一層,同樣的針對(duì)以上問(wèn)題我們也可以添加一個(gè)中間層來(lái)解決,比如添加個(gè)隊(duì)列,把用戶注冊(cè)這個(gè)事件放到隊(duì)列中,讓其他模塊去這個(gè)隊(duì)列里取這個(gè)事件然后再做相應(yīng)的操作」Bill 邊說(shuō)邊畫(huà)出了他所說(shuō)的中間層隊(duì)列

可以看到,這是個(gè)典型的生產(chǎn)者-消費(fèi)者模型,用戶注冊(cè)后只要把注冊(cè)事件丟給這個(gè)隊(duì)列就可以立即返回,實(shí)現(xiàn)了將同步變了異步,其他服務(wù)只要從這個(gè)隊(duì)列中拉取事件消費(fèi)即可進(jìn)行后續(xù)的操作,同時(shí)也實(shí)現(xiàn)了注冊(cè)用戶邏輯與其他服務(wù)的解耦,另外即使流量暴增也沒(méi)有影響,因?yàn)樽?cè)用戶將事件發(fā)給隊(duì)列后馬上返回了,這一發(fā)消息可能只要 5 ms,也就是說(shuō)總耗時(shí)是 50ms+5ms = 55 ms,而原來(lái)的總耗時(shí)是 200 ms,系統(tǒng)的吞吐量和響應(yīng)速度提升了近 4 倍,大大提升了系統(tǒng)的負(fù)責(zé)能力,這一步也就是我們常說(shuō)的削峰,將暴增的流量放入隊(duì)列中以實(shí)現(xiàn)平穩(wěn)過(guò)渡
「妙啊,加了一層隊(duì)列就達(dá)到了異步,解藕,削峰的目的,也完美地解決了我的問(wèn)題」張大胖興奮地說(shuō)
「先別高興得太早,你想想這個(gè)隊(duì)列該用哪個(gè),JDK 的內(nèi)置隊(duì)列是否可行,或者說(shuō)什么樣的隊(duì)列才能滿足我們的條件呢」Bill 提醒道
張大胖想了一下如果直接使用 JDK 的隊(duì)列(Queue)可能會(huì)有以下問(wèn)題:
由于隊(duì)列在生產(chǎn)者所在服務(wù)內(nèi)存,其他消費(fèi)者不得不從生產(chǎn)者中取,也就意味著生產(chǎn)者與消費(fèi)者緊耦合,這顯然不合理
消息丟失:現(xiàn)在是把消息存儲(chǔ)在隊(duì)列中,而隊(duì)列是在內(nèi)存中的,那如果機(jī)器宕機(jī),隊(duì)列中的消息不就丟失了嗎,顯然不可接受
單個(gè)隊(duì)列中的消息只能被一個(gè)服務(wù)消費(fèi),也就是說(shuō)如果某個(gè)服務(wù)從隊(duì)列中取消息消費(fèi)后,其他服務(wù)就取不了這個(gè)消息了,有一個(gè)辦法倒是可以,為每一個(gè)服務(wù)準(zhǔn)備一個(gè)隊(duì)列,這樣發(fā)送消息的時(shí)候只發(fā)送給一個(gè)隊(duì)列,再通過(guò)這個(gè)隊(duì)列把完整消息復(fù)制給其他隊(duì)列即可

這種做法雖然理論上可以,但實(shí)踐起來(lái)顯然有問(wèn)題,因?yàn)檫@就意味著每對(duì)接一個(gè)服務(wù)都要準(zhǔn)備一份一模一樣的隊(duì)列,而且復(fù)制多份消息性能也存在嚴(yán)重問(wèn)題,還得保證復(fù)制中消息不丟失,無(wú)疑增加了技術(shù)上的實(shí)現(xiàn)難度
broker
針對(duì)以上問(wèn)題 Bill 和張大胖商量了一下決定自己設(shè)計(jì)一個(gè)獨(dú)立于生產(chǎn)者和消費(fèi)者的消息隊(duì)列(姑且把中間這個(gè)保存消息的組件稱為 Broker),這樣的話就解決了問(wèn)題一,生產(chǎn)者把消息發(fā)給 Broker,消費(fèi)者只需把消息從 Broker 里拉出來(lái)消費(fèi)即可,生產(chǎn)者和消費(fèi)者就徹底解耦了,如下

那么這個(gè) Broker 應(yīng)該如何設(shè)計(jì)才能滿足我們的要求呢,顯然它應(yīng)該滿足以下幾個(gè)條件:
消息持久化:不能因?yàn)?Broker 宕機(jī)了消息就都丟失了,所以消息不能只保存在內(nèi)存中,應(yīng)該持久化到磁盤(pán)上,比如保存在文件里,這樣由于消息持久化了,它也可以被多個(gè)消費(fèi)者消費(fèi),只要每個(gè)消費(fèi)者保存相應(yīng)的消費(fèi)進(jìn)度,即可實(shí)現(xiàn)多個(gè)消費(fèi)者的獨(dú)立消費(fèi)
高可用:如果 Broker 宕機(jī)了,producer 就發(fā)不了消息了,consumer 也無(wú)法消費(fèi),這顯然是不可接受的,所以必須保證 Broker 的高可用
高性能:我們定一個(gè)指標(biāo),比如 10w TPS,那么要實(shí)現(xiàn)這個(gè)目的就得滿足以下三個(gè)條件:
-- producer 發(fā)送消息要快(或者說(shuō) broker 接收消息要快)
-- 持久化到文件要快
-- consumer 拉取消息要快
接下來(lái)我們?cè)賮?lái)看 broker 的整體設(shè)計(jì)情況
針對(duì)問(wèn)題一,我們可以把消息存儲(chǔ)在文件中,消息通過(guò)順序?qū)懭胛募?/strong>的方式來(lái)保證寫(xiě)入文件的高性能

順序?qū)懳募男阅芎芨?,接近于?nèi)存中的隨機(jī)寫(xiě),如下圖示

這樣 consumer 如果要消費(fèi)的話,就可以從存儲(chǔ)文件中讀取消息了。好了,現(xiàn)在問(wèn)題來(lái)了,我們都知道消息文件是存在硬盤(pán)中的,如果每次 broker 接收消息都寫(xiě)入文件,每次 consumer 讀取消息都從硬盤(pán)讀取文件,由于都是磁盤(pán) IO,是非常耗時(shí)的,有什么辦法可以解決呢
page cache
磁盤(pán) IO 是很慢的,為了避免 CPU 每次讀寫(xiě)文件都得和磁盤(pán)交互,一般先將文件讀取到內(nèi)存中,然后再由 CPU 訪問(wèn),這樣 CPU 直接在內(nèi)存中讀寫(xiě)文件就快多了,那么文件怎么從磁盤(pán)讀取入內(nèi)存呢,首先我們需要明白文件是以 block(塊)的形式讀取的,而 Linux 內(nèi)核在內(nèi)存中會(huì)以頁(yè)大小(一般為 4KB)為分配單位。對(duì)文件進(jìn)行讀寫(xiě)操作時(shí),內(nèi)核會(huì)申請(qǐng)內(nèi)存頁(yè)(內(nèi)存頁(yè)即 page,多個(gè) page 組成 page cache,即頁(yè)緩存),然后將文件的 block 加載到頁(yè)緩存中(n block size = 1 page size,如果一個(gè) block 大小等于一個(gè) page,則 n = 1)如下圖示

這樣的話讀寫(xiě)文件的過(guò)程就一目了解
對(duì)于讀文件:CPU 讀取文件時(shí),首先會(huì)在 page cache 中查找是否有相應(yīng)的文件數(shù)據(jù),如果有直接對(duì) page cache 進(jìn)行操作,如果沒(méi)有則會(huì)觸發(fā)一個(gè)缺頁(yè)異常(fault page)將磁盤(pán)上的塊加載到 page cache 中,同時(shí)由于程序局部性原理,會(huì)一次性加載多個(gè) page(讀取數(shù)據(jù)所在的 page 及其相鄰的 page )到 page cache 中以保證讀取效率
對(duì)于寫(xiě)文件:CPU 首先會(huì)將數(shù)據(jù)寫(xiě)入 page cache 中,然后再將 page cache 刷入磁盤(pán)中
CPU 對(duì)文件的讀寫(xiě)操作就轉(zhuǎn)化成了對(duì)頁(yè)緩存的讀寫(xiě)操作,這樣只要讓 producer/consumer 在內(nèi)存中讀寫(xiě)消息文件,就避免了磁盤(pán) IO
mmap
需要注意的是 page cache 是存在內(nèi)核空間中的,還不能直接為應(yīng)用程序所用,必須經(jīng)由 CPU 將內(nèi)核空間 page cache 拷貝到用戶空間中才能為進(jìn)程所用(同樣的如果是寫(xiě)文件,也是先寫(xiě)到用戶空間的緩沖區(qū)中,再拷貝到內(nèi)核空間的 page cache,然后再刷盤(pán))

畫(huà)外音:為啥要將 page cache 拷貝到用戶空間呢,這主要是因?yàn)轫?yè)緩存處在內(nèi)核空間,不能被用戶進(jìn)程直接尋址
上圖為程序讀取文件完整流程:
首先是硬盤(pán)中的文件數(shù)據(jù)載入處于內(nèi)核空間中的 page cache(也就是我們平常所說(shuō)的內(nèi)核緩沖區(qū))
CPU 將其拷貝到用戶空間中的用戶緩沖區(qū)中
程序通過(guò)用戶空間的虛擬內(nèi)存來(lái)映射操作用戶緩沖區(qū)(兩者通過(guò) MMU 來(lái)轉(zhuǎn)換),進(jìn)而達(dá)到了在內(nèi)存中讀寫(xiě)文件的目的
將以上流程簡(jiǎn)化如下

以上是傳統(tǒng)的文件讀 IO 流程,可以看到程序的一次讀文件經(jīng)歷了一次 read 系統(tǒng)調(diào)用和一次 CPU 拷貝,那么從內(nèi)核緩沖區(qū)拷貝到用戶緩沖區(qū)的這一步能否取消掉呢,答案是肯定的
只要將虛擬內(nèi)存映射到內(nèi)核緩存區(qū)即可,如下

可以看到使用這種方式有兩個(gè)好處
省去了 CPU 拷貝,原本需要 CPU 從內(nèi)核緩沖區(qū)拷貝到用戶緩沖區(qū),現(xiàn)在這一步省去了
節(jié)省了一半的空間: 因?yàn)椴恍枰獙?page cache 拷貝到用戶空間了,可以認(rèn)為用戶空間和內(nèi)核空間共享 page cache
我們把這種通過(guò)將文件映射到進(jìn)程的虛擬地址空間從而實(shí)現(xiàn)在內(nèi)存中讀寫(xiě)文件的方式稱為 mmap(Memory Mapped Files)
上面這張圖畫(huà)得有點(diǎn)簡(jiǎn)單了,再來(lái)看一下 mmap 的細(xì)節(jié)

先把磁盤(pán)上的文件映射到進(jìn)程的虛擬地址上(此時(shí)還未分配物理內(nèi)存),即調(diào)用 mmap 函數(shù)返回指針 ptr,它指向虛擬內(nèi)存中的一個(gè)地址,這樣進(jìn)程無(wú)需再調(diào)用 read 或 write 對(duì)文件進(jìn)行讀寫(xiě),只需要通過(guò) ptr 就能操作文件,所以如果需要對(duì)文件進(jìn)行多次讀寫(xiě),顯然使用 mmap 更高效,因?yàn)橹粫?huì)進(jìn)行一次系統(tǒng)調(diào)用,比起多次 read 或 write 造成的多次系統(tǒng)調(diào)用顯然開(kāi)銷會(huì)更低
但需要注意的是此時(shí)的 ptr 指向的是邏輯地址,并未真正分配物理內(nèi)存,只有通過(guò) ptr 對(duì)文件進(jìn)行讀寫(xiě)操作時(shí)才會(huì)分配物理內(nèi)存,分配之后會(huì)更新頁(yè)表,將虛擬內(nèi)存與物理內(nèi)存映射起來(lái),這樣虛擬內(nèi)存即可通過(guò) MMU 找到物理內(nèi)存,分配完內(nèi)存后即可將文件加載到 page cache,于是進(jìn)程就可在內(nèi)存中愉快地讀寫(xiě)文件了
使用 mmap 有力地提升了文件的讀寫(xiě)性能,它也是我們常說(shuō)的零拷貝的一種實(shí)現(xiàn)方式,既然 mmap 這么好,可能有人就要問(wèn)了,那為什么文件讀寫(xiě)不都用 mmap 呢,天下沒(méi)有免費(fèi)的午餐,mmap 也是有成本的,它有如下缺點(diǎn)
文件無(wú)法完成拓展:因?yàn)閳?zhí)行 mmap 的時(shí)候,你所能操作的范圍就已經(jīng)確定了,無(wú)法增加文件長(zhǎng)度
地址映射的開(kāi)銷:為了創(chuàng)建并維持虛擬地址空間與文件的映射關(guān)系,內(nèi)核中需要有特定的數(shù)據(jù)結(jié)構(gòu)來(lái)實(shí)現(xiàn)這一映射。內(nèi)核為每個(gè)進(jìn)程維護(hù)一個(gè)任務(wù)結(jié)構(gòu) task_struct,task_struct 中的 mm_struct 描述了虛擬內(nèi)存的信息,mm_struct 中的 mmap 字段是一個(gè) vm_area_struct 指針,內(nèi)核中的 vm_area_struct 對(duì)象被組織成一個(gè)鏈表 + 紅黑樹(shù)的結(jié)構(gòu)。如下圖示

所以理論上,進(jìn)程調(diào)用一次 mmap 就會(huì)產(chǎn)生一個(gè) vm_area_struct 對(duì)象(不考慮內(nèi)核自動(dòng)合并相鄰且符合條件的內(nèi)存區(qū)域),vm_area_struct 數(shù)量的增加會(huì)增大內(nèi)核的管理工作量,增大系統(tǒng)開(kāi)銷
缺頁(yè)中斷(page fault)的開(kāi)銷: 調(diào)用 mmap 內(nèi)核只是建立了邏輯地址(虛擬內(nèi)存)到物理地址(物理內(nèi)存)的映射表,實(shí)際并沒(méi)有任何數(shù)據(jù)加載到物理內(nèi)存中,只有在主動(dòng)讀寫(xiě)文件的時(shí)候發(fā)現(xiàn)數(shù)據(jù)所在分頁(yè)不在內(nèi)存中時(shí)才會(huì)觸發(fā)缺頁(yè)中斷,分配物理內(nèi)存,缺頁(yè)中斷一次讀寫(xiě)只會(huì)觸發(fā)一個(gè) page 的加載,一個(gè) page 只有 4k,想象一次,如果一個(gè)文件是 1G,那就得觸發(fā) 256 次缺頁(yè)中斷!中斷的開(kāi)銷是很大的,那么對(duì)于大文件來(lái)說(shuō),就會(huì)發(fā)生很多次的缺頁(yè)中斷,這顯然是不可接受的,所以一般 mmap 得配合另一個(gè)系統(tǒng)調(diào)用 madvise,它有個(gè)文件預(yù)熱的功能可以建議內(nèi)核一次性將一大段文件數(shù)據(jù)讀取入內(nèi)存,這樣就避免了多次的缺頁(yè)中斷,同時(shí)為了避免文件從內(nèi)存中 swap 到磁盤(pán),也可以對(duì)這塊內(nèi)存區(qū)域進(jìn)行鎖定,避免換出
mmap 并不適合讀取超大型文件,mmap 需要預(yù)先分配連續(xù)的虛擬內(nèi)存空間用于映射文件,如果文件較大,對(duì)于 32 位地址空間(4 G)的系統(tǒng)來(lái)說(shuō),可能找不到足夠大的連續(xù)區(qū)域,而且如果某個(gè)文件太大的話,會(huì)擠壓其他熱點(diǎn)小文件的 page cache 空間,影響這些文件的讀寫(xiě)性能
綜上考慮,我們給每一個(gè)消息文件定為固定的 1G 大小,如果文件滿了的話再創(chuàng)建一個(gè)即可,我們把這些存儲(chǔ)消息的文件集合稱為 commitlog。這樣的設(shè)計(jì)還有另一個(gè)好處:在刪除過(guò)期文件的時(shí)候會(huì)很方便,直接把之前的文件整個(gè)刪掉即可,最新的文件無(wú)需改動(dòng),而如果把所有消息都寫(xiě)到一個(gè)文件里,顯然刪除之前的過(guò)期消息會(huì)非常麻煩
consumeQueue 文件
通過(guò) mmap 的方式我們極大地提高了讀寫(xiě)文件的效率,這樣的話即可將 commitlog 采用 mmap 的方式加載到 page cache 中,然后再在 page cache 中讀寫(xiě)消息,如果是寫(xiě)消息直接寫(xiě)入 page cache 當(dāng)然沒(méi)問(wèn)題,但如果是讀消息(消費(fèi)者根據(jù)消費(fèi)進(jìn)度拉取消息)的話可就沒(méi)這么簡(jiǎn)單了,當(dāng)然如果每個(gè)消息的大小都一樣,那么文件讀取到內(nèi)存中其實(shí)就相當(dāng)于數(shù)組了,根據(jù)消息進(jìn)度就能很快地定位到其在文件的位置(假設(shè)消息進(jìn)度為 offset,每個(gè)消息的大小為 size,則所要消費(fèi)的位置為 offset * size),但很顯然每個(gè)消息的大小基本不可能相同,實(shí)際情況很可能是類似下面這樣

如圖示:這里有三個(gè)消息,每個(gè)消息的消息體分別為 2kb,3kb,4kb,消息大小都不一樣
這樣的話會(huì)有兩個(gè)問(wèn)題
消息邊界不清,無(wú)法區(qū)分相鄰的兩個(gè)消息
即使解決了以上問(wèn)題,也無(wú)法解決根據(jù)消費(fèi)進(jìn)度快速定位其所對(duì)應(yīng)消息在文件的位置。假設(shè) broker 重啟了,然后讀取消費(fèi)進(jìn)度(消費(fèi)進(jìn)度可以持久化到文件中),此時(shí)不得不從頭讀取文件來(lái)定位消息在文件的位置,這在效率上顯然是不可接受的
那能否既能利用到數(shù)組的快速尋址,又能快速定位消費(fèi)進(jìn)度對(duì)應(yīng)消息在文件中的位置呢,答案是可以的,我們可以新建一個(gè)索引文件(我們將其稱為 consumeQueue 文件),每次寫(xiě)入 commitlog 文件后,都把此消息在 commitlog 文件中的 offset(我們將其稱為 commit offset,8 字節(jié)) 及其大?。╯ize,4 字節(jié))還有一個(gè) tag hashcode(8 字節(jié),它的作用后文會(huì)提到)這三個(gè)字段順序?qū)懭?consumeQueue 文件中


這樣每次追加寫(xiě)入 consumeQueue 文件的大小就固定為 20 字節(jié)了,由于大小固定,根據(jù)數(shù)組的特性,就能迅速定位消費(fèi)進(jìn)度在索引文件中的位置,然后即可獲取 commitlog offset 和 size,進(jìn)而快速定位其在 commitlog 中消息

這里有個(gè)問(wèn)題,我們上文提到 commitlog 文件固定大小 1G,寫(xiě)滿了會(huì)再新建一個(gè)文件,為了方便根據(jù) commitlog offset 快速定位消息是在哪個(gè) commitlog 的哪個(gè)位置,我們可以以消息偏移量來(lái)命名文件,比如第一個(gè)文件的偏移量是 0,第二個(gè)文件的偏移量為 1G(1024*1024*1024 = 1073741824 B),第三個(gè)文件偏移量為 2G(2147483648 B),如下圖示

同理,consumeQueue 文件也會(huì)寫(xiě)滿,寫(xiě)滿后也要新建一個(gè)文件再寫(xiě)入,我們規(guī)定 consumeQueue 可以保存 30w 條數(shù)據(jù),也就是 30w * 20 byte = 600w Byte = 5.72 M,為了便于定位消費(fèi)進(jìn)度是在哪個(gè) consumeQueue文件中,每個(gè)文件的名稱也是以偏移量來(lái)命名的,如下

知道了文件的寫(xiě)入與命名規(guī)則,我們?cè)賮?lái)看下消息的寫(xiě)入與消費(fèi)過(guò)程
消息寫(xiě)入:首先是消息被順序?qū)懭?commitlog 文件中,寫(xiě)入后此消息在文件中的偏移(commitlog offset)和大?。╯ize)會(huì)被順序?qū)懭胂鄳?yīng)的 consumeQueue 文件中
消費(fèi)消息:每個(gè)消費(fèi)者都有一個(gè)消費(fèi)進(jìn)度,由于每個(gè) consumeQueue 文件是根據(jù)偏移量來(lái)命名的,首先消費(fèi)進(jìn)度可根據(jù)二分查找快速定位到進(jìn)度是在哪個(gè) consumeQueue 文件,進(jìn)一步定義到是在此文件的哪個(gè)位置,由此可以讀取到消息的 commitlog offset 和 size,然后由于 commitlog 每個(gè)文件的命名都是按照偏移量命名的,那么根據(jù) commitlog offset 顯然可以根據(jù)二分查找快速定位到消息是在哪個(gè) commitlog 文件,進(jìn)而再獲取到消息在文件中的具體位置從而讀到消息
同樣的為了提升性能, consumeQueue 也利用了 mmap 進(jìn)行讀寫(xiě)
有人可能會(huì)說(shuō)這樣查找了兩次文件,性能可能會(huì)有些問(wèn)題,實(shí)際上并不會(huì),根據(jù)前文所述,可以使用 mmap + 文件預(yù)熱 + 鎖定內(nèi)存來(lái)將文件加載并一直保留到內(nèi)存中,這樣不管是 commitlog 還是 consumeQueue 都是在 page cache 中的,既然是在內(nèi)存中查找文件那性能就不是問(wèn)題了
對(duì) ConsumeQueue 的改進(jìn)--數(shù)據(jù)分片
目前為止我們討論的場(chǎng)景是多個(gè)消費(fèi)者獨(dú)立消費(fèi)消息的場(chǎng)景,這種場(chǎng)景我們將其稱為廣播模式,這種情況下每個(gè)消費(fèi)者都會(huì)全量消費(fèi)消息,但還有一種更常見(jiàn)的場(chǎng)景我們還沒(méi)考慮到,那就是集群模式,集群模式下每個(gè)消費(fèi)者只會(huì)消費(fèi)部分消息,如下圖示:

集群模式下每個(gè)消費(fèi)者采用負(fù)載均衡的方式分別并行消費(fèi)一部分消息,主要目的是為了加速消息消費(fèi)以避免消息積壓,那么現(xiàn)在問(wèn)題來(lái)了,Broker 中只有一個(gè) consumerQueue,顯然沒(méi)法滿足集群模式下并行消費(fèi)的需求,該怎么辦呢,我們可以借鑒分庫(kù)分表的設(shè)計(jì)理念:將數(shù)據(jù)分片存儲(chǔ),具體做法是創(chuàng)建多個(gè) consumeQueue,然后將數(shù)據(jù)平均分配到這些 consumerQueue 中,這樣的話每個(gè) consumer 各自負(fù)責(zé)獨(dú)立的 consumerQueue 即可做到并行消費(fèi)

如圖示: Producer 把消息負(fù)載均衡分別發(fā)送到 queue 0 和 queue 1 隊(duì)列中,consumer A 負(fù)責(zé) queue 0,consumer B 負(fù)責(zé) queue 1 中的消息消費(fèi),這樣可以做到并行消費(fèi),極大地提升了性能
topic
現(xiàn)在所有消息都持久化到 Broker 的文件中,都能被 consumer 消費(fèi)了,但實(shí)際上某些 consumer 可能只對(duì)某一類型的消息感興趣,比如只對(duì)訂單類的消息感興趣,而對(duì)用戶注冊(cè)類的消息無(wú)感,那么現(xiàn)在的設(shè)計(jì)顯然不合理,所以需要對(duì)消息進(jìn)行進(jìn)一步的細(xì)分,我們把同一種業(yè)務(wù)類型的的消息集合稱為 Topic。這樣消費(fèi)者就可以只訂閱它感興趣的 Topic 進(jìn)行消費(fèi),因此也不難理解 consumeQueue 是針對(duì) Topic 而言的,producer 發(fā)送消息時(shí)都會(huì)指定消息的 Topic,消息到達(dá) Broker 后會(huì)發(fā)送到 Topic 中對(duì)應(yīng)的 consumeQueue,這樣消費(fèi)者就可以只消費(fèi)它感興趣的消息了

tag
把消息按業(yè)務(wù)類型劃分成 Topic 粒度還是有點(diǎn)大,以訂單消息為例,訂單有很多種狀態(tài),比如訂單創(chuàng)建,訂單關(guān)閉,訂單完結(jié)等,某些消費(fèi)者可能只對(duì)某些訂單狀態(tài)感興趣,所以我們有時(shí)還需要進(jìn)一步對(duì)某個(gè) Topic 下的消息進(jìn)行分類,我們將這些分類稱為 tag,比如訂單消息可以進(jìn)一步劃分為訂單創(chuàng)建,訂單關(guān)閉,訂單完結(jié)等 tag

producer 在發(fā)消息的時(shí)候會(huì)指定 topic 和 tag,Broker 也會(huì)把 topic, tag 持久化到文件中,那么 consumer 就可以只訂閱它感興趣的 topic + tag 消息了,現(xiàn)在問(wèn)題來(lái)了,consumer 來(lái)拉消息的時(shí)候,Broker 怎么只傳給 consumer 根據(jù) topic + tag 訂閱的消息呢
還記得上文中提到消息持久化到 commitlog 后寫(xiě)入 consumeQueue 的信息嗎

主要寫(xiě)入三個(gè)字段,最后一個(gè)字段為 tag 的 hashcode,這樣的話由于 consumer 在拉消息的時(shí)候會(huì)把 topic,tag 發(fā)給 Broker ,Broker 就可以先根據(jù) tag 的 hashcode 來(lái)對(duì)比一下看看此消息是否符合條件,如果不是略過(guò)繼續(xù)往后取,如果是再?gòu)?commitlog 中取消息后傳給 consumer,有人可能會(huì)問(wèn)為什么存的是 tag hashcode 而不是 tag,主要有兩個(gè)原因
hashcode 是整數(shù),整數(shù)對(duì)比更快
為了保證此字段為固定的字節(jié)大?。╤ashcode 為 int 型,固定為 4 個(gè)字節(jié)),這樣每次寫(xiě)入 consumeQueue 的三個(gè)字段即為固定的 20 字節(jié),即可利用數(shù)組的特性快速定位消息進(jìn)度在文件中的位置,如果用 tag 的話,由于 tag 是字符串,是變長(zhǎng)的,沒(méi)法保證固定的字節(jié)大小
至此我們簡(jiǎn)單總結(jié)下消息的發(fā)送,存儲(chǔ)與消息流程

首先 producer 發(fā)送 topic,queueId,message 到 Broker 中,Broker 將消息通過(guò)順序?qū)懙男问匠志没?commitlog 中,這里的 queueId 是 Topic 中指定的 consumeQueue 0,consumeQueue 1,consumeQueue …,一般通過(guò)負(fù)載均衡的方式輪詢寫(xiě)入對(duì)應(yīng)的隊(duì)列,比如當(dāng)前消息寫(xiě)入 consumeQueue 0,下一條寫(xiě)入 consumeQueue 1,…,不斷地循環(huán)
持久化之后可以知道消息在 commitlog 文件中的偏移量和消息體大小,如果 consumer 指定訂閱了 topic 和 tag,還會(huì)算出 tag hashCode,這樣的話就可以將這三者順序?qū)懭?queueId 對(duì)應(yīng)的 consumeQueue 中
消費(fèi)者消費(fèi):每一個(gè) consumeQueue 都能找到每個(gè)消費(fèi)者的消息進(jìn)度(consumeOffset),據(jù)此可以快速定位其所在的 consumeQueue 的文件位置,取出 commitlog offset,size,tag hashcode 這三個(gè)值,然后首先根據(jù) tag hashcode 來(lái)過(guò)濾消息,如果匹配上了再根據(jù) commitlog offset,size 這兩個(gè)元素到 commitlog 中去查找相應(yīng)的消息然后再發(fā)給消費(fèi)者
注意:所有 Topic 的消息都寫(xiě)入同一個(gè) commitlog 文件(而不是每個(gè) Topic 對(duì)應(yīng)一個(gè) commitlog 文件),然后消息寫(xiě)入后會(huì)根據(jù) topic,queueId 找到 Topic 所在的 consumeQueue 再寫(xiě)入
需要注意的是我們的 Broker 是要設(shè)定為高性能的(10 w QPS)那么上面這些步驟有兩個(gè)瓶頸點(diǎn)
producer 發(fā)送消息到持久化至 commitlog 文件的性能問(wèn)題。先來(lái)看下刷盤(pán)流程

如圖示,Broker 收到消息后是先將消息寫(xiě)到了內(nèi)核緩沖區(qū) 的 page cache 中,最終將消息刷盤(pán),那么消息是寫(xiě)到 page cache 返回 ack,還是刷盤(pán)后再返回呢,這取決于你消息的重要性,如果是像日志這樣的消息,丟了其實(shí)也沒(méi)啥影響,這種情況下顯然可以選擇寫(xiě)到 page cache 后就馬上返回,OS 會(huì)擇機(jī)將其刷盤(pán),這種刷盤(pán)方式我們將其稱為異步刷盤(pán),這也是大多數(shù)業(yè)務(wù)場(chǎng)景選擇的刷盤(pán)方式,這種方式其實(shí)已經(jīng)足夠安全了,哪怕 JVM 掛掉了,由于 page cache 是由 OS 管理的,OS 也能保證將其刷盤(pán)成功,除非 Broker 機(jī)器宕機(jī)。當(dāng)然對(duì)于像轉(zhuǎn)賬等安全性極高的金融場(chǎng)景,我們可能還是要將消息從 page cache 刷盤(pán)后再返回 ack,這種方式我們稱為同步刷盤(pán),顯然這種方式會(huì)讓性能大大降低,使用要慎重
consumer 拉取消息的性能問(wèn)題
很顯然這一點(diǎn)不是什么問(wèn)題,上文提到,不管是 commitlog 還是 consumeQueue 文件,都緩存在 page cache 中,那么直接從 page cache 中讀消息即可,由于是基于內(nèi)存的操作,不存在什么瓶頸,當(dāng)然這是基于消費(fèi)進(jìn)度與生產(chǎn)進(jìn)度差不多的前提,如果某個(gè)消費(fèi)者指定要從某個(gè)進(jìn)度開(kāi)始消費(fèi),且此進(jìn)度對(duì)應(yīng)的 commitlog 文件不在 page cache 中,那就會(huì)觸發(fā)磁盤(pán) IO
Broker 的高可用
上文我們都是基于一個(gè) Broker 來(lái)討論的,這顯然有問(wèn)題,Broker 如果掛了,依賴它的 producer,consumer 不就也嗝屁了嗎,所以 broker 的高可用是必須的,一般采用主從模式來(lái)實(shí)現(xiàn) broker 的高可用

如圖示:Producer 將消息發(fā)給 主 Broker ,然后 consumer 從主 Broker 里拉消息,而 從 Broker 則會(huì)從主 Broker 同步消息,這樣的話一旦主 Broker 宕機(jī)了,consumer 可以從 Broker 里拉消息,同時(shí)在 RocketMQ 4.5 以后,引入一種 dledger 模式,這種模式要求一主多從(至少 3 個(gè)節(jié)點(diǎn)),這樣如果主 Broker 宕機(jī)后,另外多個(gè)從 Broker 會(huì)根據(jù) Raft 協(xié)議選舉出一個(gè)主 Broker,Producer 就可以向這個(gè)新選舉出來(lái)的主節(jié)點(diǎn)發(fā)送消息了
如果 QPS 很高只有一個(gè)主 Broker 的話也存在性能上的瓶頸,所以生產(chǎn)上一般采用多主的形式,如下圖示

這樣的話 Producer 可以負(fù)載均衡地將消息發(fā)送到多個(gè) Broker 上,提高了系統(tǒng)的負(fù)載能力,不難發(fā)現(xiàn)這意味著 Topic 是分布式存儲(chǔ)在多個(gè) Broker 上的,而 Topic 在每個(gè) Broker 上的存儲(chǔ)都是以多個(gè) consumeQueue 的形式存在的,這極大地提升了 Topic 的水平擴(kuò)展與系統(tǒng)的并發(fā)執(zhí)行能力

nameserver
目前為止我們的設(shè)計(jì)貌似不錯(cuò),通過(guò)一系列設(shè)計(jì)讓 Broker 滿足了高性能,高擴(kuò)展的要求,但我們似乎忽略了一個(gè)問(wèn)題,Producer,Consumer 該怎么和 Broker 通信呢,一種做法是在 Producer,Consumer 寫(xiě)死要通信的 Broker ip 地址,雖然可行,但這么做的話顯然會(huì)有很大的問(wèn)題,配置死板,擴(kuò)展性差,考慮以下場(chǎng)景
如果擴(kuò)容(新增 Broker),producer 和 consumer 是不是也要跟著新增 Broker ip 地址
每次新增 Topic 都要指定在哪些 Broker 存儲(chǔ),我們知道 producer 在發(fā)消息,consumer 在訂閱消息的時(shí)候都要指定對(duì)應(yīng)的 Topic ,那就意味著每次新增 Topic 后都需要在 producer,consumer 做相應(yīng)變更(記錄 topic -> broker 地址)
如果 broker 宕機(jī)了,producer 和 consumer 需要將其從配置中移除,這就意味著 producer,consumer 需要與相關(guān)的 brokers 通過(guò)心跳來(lái)通信以便知道其存活與否,這樣無(wú)疑增加了設(shè)計(jì)的復(fù)雜度
參考下 dubbo 這類 RPC 框架,你會(huì)發(fā)現(xiàn)基本上都會(huì)新增一個(gè)類似 Zookeeper 這樣的注冊(cè)中心的中間層(一般稱其為 nameserver),如下

主要原理如下:
為了保證高可用,一般 nameserver 以集群的形式存在(至少兩個(gè)),Broker 啟動(dòng)后不管主從都會(huì)向每一個(gè) nameserver 注冊(cè),注冊(cè)的信息有哪些呢,想想看 producer 要發(fā)消息給 broker 需要知道哪些信息呢,首先發(fā)消息要指定 Topic,然后要指定 Topic 所在的 broker,再然后是知道 Topic 在 Broker 中的隊(duì)列數(shù)量(可以這樣負(fù)載均衡地將消息發(fā)送到這些 queue 中),所以 broker 向 nameserver 注冊(cè)的信息中應(yīng)該包含以下信息

這樣的話 producer 和 consumer 就可以通過(guò)與 nameserver 建立長(zhǎng)連接來(lái)定時(shí)(比如每隔 30 s)拉取這些路由信息從而更新到本地,發(fā)送/消費(fèi)消息的時(shí)候就可以依據(jù)這些路由信息進(jìn)行發(fā)送/消費(fèi)
那么加了一個(gè) nameserver 和原來(lái)的方案相比有什么好處呢,可以很明顯地看出:producer/consumer 與具體的 broker 解耦了,極大提升了整體架構(gòu)的可擴(kuò)展性:
producer/consumer 的所有路由信息都能通過(guò) nameserver 得到,比如現(xiàn)在要在 brokers 上新建一個(gè) Topic,那么 brokers 會(huì)把這些信息同步到 nameserver,而 producer/consumer 會(huì)定時(shí)去 nameserver 拉取這些路由信息更新到本地,做到了路由信息配置的自動(dòng)化
同樣的如果某些 broker 宕機(jī)了,由于 broker 會(huì)定時(shí)上報(bào)心跳到 nameserver 以告知其存活狀態(tài),一旦 nameserver 監(jiān)測(cè)到 broker 失效了,producer/consumer 也能從中得到其失效信息,從而在本地路由中將其剔除
可以看到通過(guò)加了一層 nameserver,producer/consumer 路由信息做到了配置自動(dòng)化,再也不用手動(dòng)去操作了,整體架構(gòu)甚為合理
總結(jié)
以上即我們所要闡述的 RocketMQ 的設(shè)計(jì)理念,基本上涵蓋了重要概念的介紹,我們?cè)賮?lái)簡(jiǎn)單回顧一下:
首先根據(jù)業(yè)務(wù)場(chǎng)景我們提出了 RocketMQ 設(shè)計(jì)的三大目標(biāo):消息持久化,高性能,高可用,毫無(wú)疑問(wèn) broker 的設(shè)計(jì)是實(shí)現(xiàn)這三大目標(biāo)的關(guān)鍵,為了消息持久化,我們?cè)O(shè)計(jì)了 commitlog 文件,通過(guò)順序?qū)懙姆绞奖WC了文件寫(xiě)入的高性能,但如果每次 producer 寫(xiě)入消息或者 consumer 讀取消息都從文件來(lái)讀寫(xiě),由于涉及到磁盤(pán) IO 顯然性能會(huì)有很大的問(wèn)題,于是我們了解到操作系統(tǒng)讀寫(xiě)文件會(huì)先將文件加載到內(nèi)存中的 page cache 中。對(duì)于傳統(tǒng)的文件 IO,由于 page cache 存在內(nèi)核空間中,還需要將其拷貝到用戶空間中才能為進(jìn)程所用(同樣的,寫(xiě)入消息也要寫(xiě)將消息寫(xiě)入用戶空間的 buffer,再拷貝到 內(nèi)核空間中的 page cache),于是我們使用了 mmap 來(lái)避免了這次拷貝,這樣的話 producer 發(fā)送消息只要先把消息寫(xiě)入 page cache 再異步刷盤(pán),而 consumer 只要保證消息進(jìn)度能跟得上 producer 產(chǎn)生消息的進(jìn)度,就可以直接從 page cache 中讀取消息進(jìn)行消費(fèi),于是 producer 與 consumer 都可以直接從 page cache 中讀寫(xiě)消息,極大地提升了消息的讀寫(xiě)性能,那怎么保證 consumer 消費(fèi)足夠快以跟上 producer 產(chǎn)生消息的速度的,顯然,讓消息分布式,分片存儲(chǔ)是一種通用方案,這樣的話通過(guò)增加 consumer 即可達(dá)到并發(fā)消費(fèi)消息的目的
最后,為了避免每次創(chuàng)建 Topic 或者 broker 宕機(jī)都得修改 producer/consumer 上的配置,我們引入了 nameserver, 實(shí)現(xiàn)了服務(wù)的自動(dòng)發(fā)現(xiàn)功能。
仔細(xì)與其它 RPC 框架橫向?qū)Ρ群?,你?huì)發(fā)現(xiàn)這些 RPC 框架用的思想其實(shí)都很類似,比如數(shù)據(jù)使用分片存儲(chǔ)以提升數(shù)據(jù)存儲(chǔ)的水平擴(kuò)展與并發(fā)執(zhí)行能力,使用 zookeeper,nameserver 等注冊(cè)中心來(lái)達(dá)到服務(wù)注冊(cè)與自動(dòng)發(fā)現(xiàn)的目的,所以掌握了這些思想, 我們?cè)偃ビ^察學(xué)習(xí)或設(shè)計(jì) RPC 時(shí)就能達(dá)到事半功倍的效果
