深入解析分布式消息隊(duì)列設(shè)計(jì)精髓
點(diǎn)擊上方“服務(wù)端思維”,選擇“設(shè)為星標(biāo)”
回復(fù)”669“獲取獨(dú)家整理的精選資料集
回復(fù)”加群“加入全國(guó)服務(wù)端高端社群「后端圈」
作者:vincentchma,騰訊 IEG 后臺(tái)開(kāi)發(fā)工程師
一、消息隊(duì)列的演進(jìn)
分布式消息隊(duì)列中間件是是大型分布式系統(tǒng)中常見(jiàn)的中間件。消息隊(duì)列主要解決應(yīng)用耦合、異步消息、流量削鋒等問(wèn)題,具有高性能、高可用、可伸縮和最終一致性等特點(diǎn)。消息隊(duì)列已經(jīng)逐漸成為企業(yè)應(yīng)用系統(tǒng)內(nèi)部通信的核心手段,使用較多的消息隊(duì)列有 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、Pulsar 等,此外,利用數(shù)據(jù)庫(kù)(如 Redis、MySQL 等)也可實(shí)現(xiàn)消息隊(duì)列的部分基本功能。
1.基于 OS 的 MQ
單機(jī)消息隊(duì)列可以通過(guò)操作系統(tǒng)原生的進(jìn)程間通信機(jī)制來(lái)實(shí)現(xiàn),如消息隊(duì)列、共享內(nèi)存等。比如我們可以在共享內(nèi)存中維護(hù)一個(gè)雙端隊(duì)列:

消息產(chǎn)出進(jìn)程不停地往隊(duì)列里添加消息,同時(shí)消息消費(fèi)進(jìn)程不斷地從隊(duì)尾有序地取出這些消息。添加消息的任務(wù)我們稱為 producer,而取出并使用消息的任務(wù),我們稱之為 consumer。這種模式在早期單機(jī)多進(jìn)程模式中比較常見(jiàn), 比如 IO 進(jìn)程把收到的網(wǎng)絡(luò)請(qǐng)求存入本機(jī) MQ,任務(wù)處理進(jìn)程從本機(jī) MQ 中讀取任務(wù)并進(jìn)行處理。
單機(jī) MQ 易于實(shí)現(xiàn),但是缺點(diǎn)也很明顯:因?yàn)橐蕾囉趩螜C(jī) OS 的 IPC 機(jī)制,所以無(wú)法實(shí)現(xiàn)分布式的消息傳遞,并且消息隊(duì)列的容量也受限于單機(jī)資源。
2.基于 DB 的 MQ
即使用存儲(chǔ)組件(如 Mysql 、 Redis 等)存儲(chǔ)消息, 然后在消息的生產(chǎn)側(cè)和消費(fèi)側(cè)實(shí)現(xiàn)消息的生產(chǎn)消費(fèi)邏輯,從而實(shí)現(xiàn) MQ 功能。以 Redis 為例, 可以使用 Redis 自帶的 list 實(shí)現(xiàn)。Redis list 使用 lpush 命令,從隊(duì)列左邊插入數(shù)據(jù);使用 rpop 命令,從隊(duì)列右邊取出數(shù)據(jù)。與單機(jī) MQ 相比, 該方案至少滿足了分布式, 但是仍然帶有很多無(wú)法接受的缺陷。
熱 key 性能問(wèn)題:不論是用 codis 還是 twemproxy 這種集群方案,對(duì)某個(gè)隊(duì)列的讀寫(xiě)請(qǐng)求最終都會(huì)落到同一臺(tái) redis 實(shí)例上,并且無(wú)法通過(guò)擴(kuò)容來(lái)解決問(wèn)題。如果對(duì)某個(gè) list 的并發(fā)讀寫(xiě)非常高,就產(chǎn)生了無(wú)法解決的熱 key,嚴(yán)重可能導(dǎo)致系統(tǒng)崩潰 沒(méi)有消費(fèi)確認(rèn)機(jī)制:每當(dāng)執(zhí)行 rpop 消費(fèi)一條數(shù)據(jù),那條消息就被從 list 中永久刪除了。如果消費(fèi)者消費(fèi)失敗,這條消息也沒(méi)法找回了。 不支持多訂閱者:一條消息只能被一個(gè)消費(fèi)者消費(fèi),rpop 之后就沒(méi)了。如果隊(duì)列中存儲(chǔ)的是應(yīng)用的日志,對(duì)于同一條消息,監(jiān)控系統(tǒng)需要消費(fèi)它來(lái)進(jìn)行可能的報(bào)警,BI 系統(tǒng)需要消費(fèi)它來(lái)繪制報(bào)表,鏈路追蹤需要消費(fèi)它來(lái)繪制調(diào)用關(guān)系……這種場(chǎng)景 redis list 就沒(méi)辦法支持了 不支持二次消費(fèi):一條消息 rpop 之后就沒(méi)了。如果消費(fèi)者程序運(yùn)行到一半發(fā)現(xiàn)代碼有 bug,修復(fù)之后想從頭再消費(fèi)一次就不行了。
針對(duì)上述缺點(diǎn),redis 5.0 開(kāi)始引入 stream 數(shù)據(jù)類型,它是專門(mén)設(shè)計(jì)成為消息隊(duì)列的數(shù)據(jù)結(jié)構(gòu),借鑒了很多 kafka 的設(shè)計(jì),但是隨著很多分布式 MQ 組件的出現(xiàn),仍然顯得不夠友好, 畢竟 Redis 天生就不是用來(lái)做消息轉(zhuǎn)發(fā)的。
3. 專用分布式 MQ 中間件
隨著時(shí)代的發(fā)展,一個(gè)真正的消息隊(duì)列,已經(jīng)不僅僅是一個(gè)隊(duì)列那么簡(jiǎn)單了,業(yè)務(wù)對(duì) MQ 的吞吐量、擴(kuò)展性、穩(wěn)定性、可靠性等都提出了嚴(yán)苛的要求。因此,專用的分布式消息中間件開(kāi)始大量出現(xiàn)。常見(jiàn)的有 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、Pulsar 等等。
二、消息隊(duì)列設(shè)計(jì)要點(diǎn)
消息隊(duì)列本質(zhì)上是一個(gè)消息的轉(zhuǎn)發(fā)系統(tǒng), 把一次 RPC 就可以直接完成的消息投遞,轉(zhuǎn)換成多次 RPC 間接完成,這其中包含兩個(gè)關(guān)鍵環(huán)節(jié):
1.消息轉(zhuǎn)儲(chǔ);
2.消息投遞:時(shí)機(jī)和對(duì)象;
基于此,消息隊(duì)列的整體設(shè)計(jì)思路是:
確定整體的數(shù)據(jù)流向:如 producer 發(fā)送給 MQ,MQ 轉(zhuǎn)發(fā)給 consumer,consumer 回復(fù)消費(fèi)確認(rèn),消息刪除、消息備份等。 利用 RPC 將數(shù)據(jù)流串起來(lái),最好基于現(xiàn)有的 RPC 框架,盡量做到無(wú)狀態(tài),方便水平擴(kuò)展。 存儲(chǔ)選型,綜合考慮性能、可靠性和開(kāi)發(fā)維護(hù)成本等諸多因素。 消息投遞,消費(fèi)模式 push、pull。 消費(fèi)關(guān)系維護(hù),單播、多播等,可以利用 zk、config server 等保存消費(fèi)關(guān)系。 高級(jí)特性,如可靠投遞,重復(fù)消息,順序消息等, 很多高級(jí)特性之間是相互制約的關(guān)系,這里要充分結(jié)合應(yīng)用場(chǎng)景做出取舍。

1.MQ 基本特性
RPC 通信
MQ 組件要實(shí)現(xiàn)和生產(chǎn)者以及消費(fèi)者進(jìn)行通信功能, 這里涉及到 RPC 通信問(wèn)題。消息隊(duì)列的 RPC,和普通的 RPC 沒(méi)有本質(zhì)區(qū)別。對(duì)于負(fù)載均衡、服務(wù)發(fā)現(xiàn)、序列化協(xié)議等等問(wèn)題都可以借助現(xiàn)有 RPC 框架來(lái)實(shí)現(xiàn),避免重復(fù)造輪子。
存儲(chǔ)系統(tǒng)
存儲(chǔ)可以做成很多方式。比如存儲(chǔ)在內(nèi)存里,存儲(chǔ)在分布式 KV 里,存儲(chǔ)在磁盤(pán)里,存儲(chǔ)在數(shù)據(jù)庫(kù)里等等。但歸結(jié)起來(lái),主要有持久化和非持久化兩種。
持久化的形式能更大程度地保證消息的可靠性(如斷電等不可抗外力),并且理論上能承載更大限度的消息堆積(外存的空間遠(yuǎn)大于內(nèi)存)。但并不是每種消息都需要持久化存儲(chǔ)。很多消息對(duì)于投遞性能的要求大于可靠性的要求,且數(shù)量極大(如日志)。這時(shí)候,消息不落地直接暫存內(nèi)存,嘗試幾次 failover,最終投遞出去也未嘗不可。常見(jiàn)的消息隊(duì)列普遍兩種形式都支持。
從速度來(lái)看,理論上,文件系統(tǒng)>分布式 KV(持久化)>分布式文件系統(tǒng)>數(shù)據(jù)庫(kù),而可靠性卻相反。還是要從支持的業(yè)務(wù)場(chǎng)景出發(fā)作出最合理的選擇。
高可用
MQ 的高可用,依賴于 RPC 和存儲(chǔ)的高可用。通常 RPC 服務(wù)自身都具有服務(wù)自動(dòng)發(fā)現(xiàn),負(fù)載均衡等功能,保證了其高可用。存儲(chǔ)的高可用, 例如 Kafka,使用分區(qū)加主備模式,保證每一個(gè)分區(qū)內(nèi)的高可用性,也就是每一個(gè)分區(qū)至少要有一個(gè)備份且需要做數(shù)據(jù)的同步。
推拉模型
push 和 pull 模型各有利弊,兩種模式也都有被市面上成熟的消息中間件選用。
1.慢消費(fèi)
慢消費(fèi)是 push 模型最大的致命傷,如果消費(fèi)者的速度比發(fā)送者的速度慢很多,會(huì)出現(xiàn)兩種惡劣的情況:
1.消息在 broker 的堆積。假設(shè)這些消息都是有用的無(wú)法丟棄的,消息就要一直在 broker 端保存。
2.broker 推送給 consumer 的消息 consumer 無(wú)法處理,此時(shí) consumer 只能拒絕或者返回錯(cuò)誤。
而 pull 模式下,consumer 可以按需消費(fèi),不用擔(dān)心自己處理不了的消息來(lái)騷擾自己,而 broker 堆積消息也會(huì)相對(duì)簡(jiǎn)單,無(wú)需記錄每一個(gè)要發(fā)送消息的狀態(tài),只需要維護(hù)所有消息的隊(duì)列和偏移量就可以了。所以對(duì)于慢消費(fèi),消息量有限且到來(lái)的速度不均勻的情況,pull 模式比較合適。
2.消息延遲與忙等
這是 pull 模式最大的短板。由于主動(dòng)權(quán)在消費(fèi)方,消費(fèi)方無(wú)法準(zhǔn)確地決定何時(shí)去拉取最新的消息。如果一次 pull 取到消息了還可以繼續(xù)去 pull,如果沒(méi)有 pull 取到則需要等待一段時(shí)間重新 pull。
消息投放時(shí)機(jī)
即消費(fèi)者應(yīng)該在什么時(shí)機(jī)消費(fèi)消息。一般有以下三種方式:
攢夠了一定數(shù)量才投放。 到達(dá)了一定時(shí)間就投放。 有新的數(shù)據(jù)到來(lái)就投放。
至于如何選擇,也要結(jié)合具體的業(yè)務(wù)場(chǎng)景來(lái)決定。比如,對(duì)及時(shí)性要求高的數(shù)據(jù),可用采用方式 3 來(lái)完成。
消息投放對(duì)象
不管是 JMS 規(guī)范中的 Topic/Queue,Kafka 里面的 Topic/Partition/ConsumerGroup,還是 AMQP(如 RabbitMQ)的 Exchange 等等, 都是為了維護(hù)消息的消費(fèi)關(guān)系而抽象出來(lái)的概念。本質(zhì)上,消息的消費(fèi)無(wú)外乎點(diǎn)到點(diǎn)的一對(duì)一單播,或一對(duì)多廣播。另外比較特殊的情況是組間廣播、組內(nèi)單播。比較通用的設(shè)計(jì)是,不同的組注冊(cè)不同的訂閱,支持組間廣播。組內(nèi)不同的機(jī)器,如果注冊(cè)一個(gè)相同的 ID,則單播;如果注冊(cè)不同的 ID(如 IP 地址+端口),則廣播。
例如 pulsar 支持的訂閱模型有:
Exclusive:獨(dú)占型,一個(gè)訂閱只能有一個(gè)消息者消費(fèi)消息。 Failover:災(zāi)備型,一個(gè)訂閱同時(shí)只有一個(gè)消費(fèi)者,可以有多個(gè)備份消費(fèi)者。一旦主消費(fèi)者故障則備份消費(fèi)者接管。不會(huì)出現(xiàn)同時(shí)有兩個(gè)活躍的消費(fèi)者。 Shared:共享型,一個(gè)訂閱中同時(shí)可以有多個(gè)消費(fèi)者,多個(gè)消費(fèi)者共享 Topic 中的消息。 Key_Shared:鍵共享型,多個(gè)消費(fèi)者各取一部分消息。
通常會(huì)在公共存儲(chǔ)上維護(hù)廣播關(guān)系,如 config server、zookeeper 等。
2.隊(duì)列高級(jí)特性
常見(jiàn)的高級(jí)特性有可靠投遞、消息丟失、消息重復(fù)、事務(wù)等等,他們并非是 MQ 必備的特性。由于這些特性可能是相互制約的,所以不可能完全兼顧。所以要依照業(yè)務(wù)的需求,來(lái)仔細(xì)衡量各種特性實(shí)現(xiàn)的成本、利弊,最終做出最為合理的設(shè)計(jì)。
可靠投遞
如何保證消息完全不丟失?
直觀的方案是,在任何不可靠操作之前,先將消息落地,然后操作。當(dāng)失敗或者不知道結(jié)果(比如超時(shí))時(shí),消息狀態(tài)是待發(fā)送,定時(shí)任務(wù)不停輪詢所有待發(fā)送消息,最終一定可以送達(dá)。但是,這樣必然導(dǎo)致消息可能會(huì)重復(fù),并且在異常情況下,消息延遲較大。
例如:
producer 往 broker 發(fā)送消息之前,需要做一次落地。 請(qǐng)求到 server 后,server 確保數(shù)據(jù)落地后再告訴客戶端發(fā)送成功。 支持廣播的消息隊(duì)列需要對(duì)每個(gè)接收者,持久化一個(gè)發(fā)送狀態(tài),直到所有接收者都確認(rèn)收到,才可刪除消息。
即對(duì)于任何不能確認(rèn)消息已送達(dá)的情況,都要重推消息。但是,隨著而來(lái)的問(wèn)題就是消息重復(fù)。在消息重復(fù)和消息丟失之間,無(wú)法兼顧,要結(jié)合應(yīng)用場(chǎng)景做出取舍。
消費(fèi)確認(rèn)
當(dāng) broker 把消息投遞給消費(fèi)者后,消費(fèi)者可以立即確認(rèn)收到了消息。但是,有些情況消費(fèi)者可能需要再次接收該消息(比如收到消息、但是處理失?。?,即消費(fèi)者主動(dòng)要求重發(fā)消息。所以,要允許消費(fèi)者主動(dòng)進(jìn)行消費(fèi)確認(rèn)。
順序消息
對(duì)于 push 模式,要求支持分區(qū)且單分區(qū)只支持一個(gè)消費(fèi)者消費(fèi),并且消費(fèi)者只有確認(rèn)一個(gè)消息消費(fèi)后才能 push 另外一個(gè)消息,還要發(fā)送者保證發(fā)送順序唯一。
對(duì)于 pull 模式,比如 kafka 的做法:
producer 對(duì)應(yīng) partition,并且單線程。 consumer 對(duì)應(yīng) partition,消費(fèi)確認(rèn)(或批量確認(rèn)),單線程消費(fèi)。
但是這樣也只是實(shí)現(xiàn)了消息的分區(qū)有序性,并不一定全局有序??傮w而言,要求消息有序的 MQ 場(chǎng)景還是比較少的。
三、Kafka
Kafka 是一個(gè)分布式發(fā)布訂閱消息系統(tǒng)。它以高吞吐、可持久化、可水平擴(kuò)展、支持流數(shù)據(jù)處理等多種特性而被廣泛使用(如 Storm、Spark、Flink)。在大數(shù)據(jù)系統(tǒng)中,數(shù)據(jù)需要在各個(gè)子系統(tǒng)中高性能、低延遲的不停流轉(zhuǎn)。傳統(tǒng)的企業(yè)消息系統(tǒng)并不是非常適合大規(guī)模的數(shù)據(jù)處理,但 Kafka 出現(xiàn)了,它可以高效的處理實(shí)時(shí)消息和離線消息,降低編程復(fù)雜度,使得各個(gè)子系統(tǒng)可以快速高效的進(jìn)行數(shù)據(jù)流轉(zhuǎn),Kafka 承擔(dān)高速數(shù)據(jù)總線的作用。
kafka 基礎(chǔ)概念
BrokerKafka 集群包含一個(gè)或多個(gè)服務(wù)器,這種服務(wù)器被稱為 broker。 TopicTopic 在邏輯上可以被認(rèn)為是一個(gè) queue,每條消費(fèi)都必須指定它的 Topic,可以簡(jiǎn)單理解為必須指明把這條消息放進(jìn)哪個(gè) queue 里。為了使得 Kafka 的吞吐率可以線性提高,物理上把 Topic 分成一個(gè)或多個(gè) Partition,每個(gè) Partition 在物理上對(duì)應(yīng)一個(gè)文件夾,該文件夾下存儲(chǔ)這個(gè) Partition 的所有消息和索引文件。 PartitionParition 是物理上的概念,每個(gè) Topic 包含一個(gè)或多個(gè) Partition。 Producer負(fù)責(zé)發(fā)布消息到 Kafka broker。 Consumer消息消費(fèi)者,向 Kafka broker 讀取消息的客戶端。 Consumer Group每個(gè) Consumer 屬于一個(gè)特定的 Consumer Group(可為每個(gè) Consumer 指定 group name,若不指定 group name 則屬于默認(rèn)的 group)。

一個(gè)典型的 kafka 集群包含若干 Producer,若干個(gè) Broker(kafka 支持水平擴(kuò)展)、若干個(gè) Consumer Group,以及一個(gè) zookeeper 集群。Producer 使用 push 模式將消息發(fā)布到 broker。consumer 使用 pull 模式從 broker 訂閱并消費(fèi)消息。多個(gè) broker 協(xié)同工作,producer 和 consumer 部署在各個(gè)業(yè)務(wù)邏輯中。kafka 通過(guò) zookeeper 管理集群配置及服務(wù)協(xié)同。
這樣就組成了一個(gè)高性能的分布式消息發(fā)布和訂閱系統(tǒng)。Kafka 有一個(gè)細(xì)節(jié)是和其他 mq 中間件不同的點(diǎn),producer 發(fā)送消息到 broker 的過(guò)程是 push,而 consumer 從 broker 消費(fèi)消息的過(guò)程是 pull,主動(dòng)去拉數(shù)據(jù)。而不是 broker 把數(shù)據(jù)主動(dòng)發(fā)送給 consumer。
Producer 發(fā)送消息到 broker 時(shí),會(huì)根據(jù) Paritition 機(jī)制選擇將其存儲(chǔ)到哪一個(gè) Partition。如果 Partition 機(jī)制設(shè)置合理,所有消息可以均勻分布到不同的 Partition 里,這樣就實(shí)現(xiàn)了負(fù)載均衡。如果一個(gè) Topic 對(duì)應(yīng)一個(gè)文件,那這個(gè)文件所在的機(jī)器 I/O 將會(huì)成為這個(gè) Topic 的性能瓶頸,而有了 Partition 后,不同的消息可以并行寫(xiě)入不同 broker 的不同 Partition 里,極大的提高了吞吐率。
Kafka 特點(diǎn)
優(yōu)點(diǎn):
高性能:?jiǎn)螜C(jī)測(cè)試能達(dá)到 100w tps 低延時(shí):生產(chǎn)和消費(fèi)的延時(shí)都很低,e2e 的延時(shí)在正常的 cluster 中也很低 可用性高:replicate+ isr + 選舉 機(jī)制保證 工具鏈成熟:監(jiān)控 運(yùn)維 管理 方案齊全 生態(tài)成熟:大數(shù)據(jù)場(chǎng)景必不可少 kafka stream
不足:
無(wú)法彈性擴(kuò)容:對(duì) partition 的讀寫(xiě)都在 partition leader 所在的 broker,如果該 broker 壓力過(guò)大,也無(wú)法通過(guò)新增 broker 來(lái)解決問(wèn)題 擴(kuò)容成本高:集群中新增的 broker 只會(huì)處理新 topic,如果要分擔(dān)老 topic-partition 的壓力,需要手動(dòng)遷移 partition,這時(shí)會(huì)占用大量集群帶寬 消費(fèi)者新加入和退出會(huì)造成整個(gè)消費(fèi)組 rebalance:導(dǎo)致數(shù)據(jù)重復(fù)消費(fèi),影響消費(fèi)速度,增加延遲 partition 過(guò)多會(huì)使得性能顯著下降:ZK 壓力大,broker 上 partition 過(guò)多讓磁盤(pán)順序?qū)憥缀跬嘶呻S機(jī)寫(xiě)
高吞吐機(jī)制
順序存取
如果把消息以隨機(jī)的方式寫(xiě)入到磁盤(pán),那么磁盤(pán)首先要做的就是尋址,也就是定位到數(shù)據(jù)所在的物理地址,在磁盤(pán)上就要找到對(duì)應(yīng)柱面、磁頭以及對(duì)應(yīng)的扇區(qū);這個(gè)過(guò)程相對(duì)內(nèi)存來(lái)說(shuō)會(huì)消耗大量時(shí)間,為了規(guī)避隨機(jī)讀寫(xiě)帶來(lái)的時(shí)間消耗,kafka 采用順序?qū)懙姆绞酱鎯?chǔ)數(shù)據(jù)。
頁(yè)緩存
即使是順序存取,但是頻繁的 I/O 操作仍然會(huì)造成磁盤(pán)的性能瓶頸,所以 kafka 使用了頁(yè)緩存和零拷貝技術(shù)。當(dāng)進(jìn)程準(zhǔn)備讀取磁盤(pán)上的文件內(nèi)容時(shí), 操作系統(tǒng)會(huì)先查看待讀取的數(shù)據(jù)是否在頁(yè)緩存中,如果存在則直接返回?cái)?shù)據(jù), 從而避免了對(duì)物理磁盤(pán)的 I/O 操作;
如果沒(méi)有命中, 則操作系統(tǒng)會(huì)向磁盤(pán)發(fā)起讀取請(qǐng)求并將讀取的數(shù)據(jù)頁(yè)存入頁(yè)緩存, 之后再將數(shù)據(jù)返回給進(jìn)程。一個(gè)進(jìn)程需要將數(shù)據(jù)寫(xiě)入磁盤(pán), 那么操作系統(tǒng)也會(huì)檢測(cè)數(shù)據(jù)對(duì)應(yīng)的頁(yè)是否在頁(yè)緩存中,如果不存在, 則會(huì)先在頁(yè)緩存中添加相應(yīng)的頁(yè), 最后將數(shù)據(jù)寫(xiě)入對(duì)應(yīng)的頁(yè)。被修改過(guò)后的頁(yè)也就變成了臟頁(yè), 操作系統(tǒng)會(huì)在合適的時(shí)間把臟頁(yè)中的數(shù)據(jù)寫(xiě)入磁盤(pán), 以保持?jǐn)?shù)據(jù)的 一 致性。
Kafka 中大量使用了頁(yè)緩存, 這是 Kafka 實(shí)現(xiàn)高吞吐的重要因素之 一 。雖然消息都是先被寫(xiě)入頁(yè)緩存,然后由操作系統(tǒng)負(fù)責(zé)具體的刷盤(pán)任務(wù)的, 但在 Kafka 中同樣提供了同步刷盤(pán)及間斷性強(qiáng)制刷盤(pán)(fsync),可以通過(guò)參數(shù)來(lái)控制。
同步刷盤(pán)能夠保證消息的可靠性,避免因?yàn)殄礄C(jī)導(dǎo)致頁(yè)緩存數(shù)據(jù)還未完成同步時(shí)造成的數(shù)據(jù)丟失。但是實(shí)際使用上,我們沒(méi)必要去考慮這樣的因素以及這種問(wèn)題帶來(lái)的損失,消息可靠性可以由多副本來(lái)解決,同步刷盤(pán)會(huì)帶來(lái)性能的影響。
頁(yè)緩存的好處:
I/O Scheduler 會(huì)將連續(xù)的小塊寫(xiě)組裝成大塊的物理寫(xiě)從而提高性能; I/O Scheduler 會(huì)嘗試將一些寫(xiě)操作重新按順序排好,從而減少磁頭移動(dòng)時(shí)間; 充分利用所有空閑內(nèi)存(非 JVM 內(nèi)存); 讀操作可以直接在 Page Cache 內(nèi)進(jìn)行,如果消費(fèi)和生產(chǎn)速度相當(dāng),甚至不需要通過(guò)物理磁盤(pán)交換數(shù)據(jù); 如果進(jìn)程重啟,JVM 內(nèi)的 Cache 會(huì)失效,但 Page Cache 仍然可用。
零拷貝
零拷貝技術(shù)可以減少 CPU 的上下文切換和數(shù)據(jù)拷貝次數(shù)。
常規(guī)方式

應(yīng)用程序一次常規(guī)的數(shù)據(jù)請(qǐng)求過(guò)程,發(fā)生了 4 次拷貝,2 次 DMA 和 2 次 CPU,而 CPU 發(fā)生了 4 次的切換。(DMA 簡(jiǎn)單理解就是,在進(jìn)行 I/O 設(shè)備和內(nèi)存的數(shù)據(jù)傳輸?shù)臅r(shí)候,數(shù)據(jù)搬運(yùn)的工作全部交給 DMA 控制器,而 CPU 不再參與任何與數(shù)據(jù)搬運(yùn)相關(guān)的事情)
零拷貝的方式

通過(guò)零拷貝優(yōu)化,CPU 只發(fā)生了 2 次的上下文切換和 3 次數(shù)據(jù)拷貝。
批量發(fā)送
Kafka 允許進(jìn)行批量發(fā)送消息,先將消息緩存在內(nèi)存中,然后一次請(qǐng)求批量發(fā)送出去,這種策略將大大減少服務(wù)端的 I/O 次數(shù)。
數(shù)據(jù)壓縮
Kafka 還支持對(duì)消息集合進(jìn)行壓縮,Producer 可以通過(guò) GZIP 或 Snappy 格式對(duì)消息集合進(jìn)行壓縮,Producer 壓縮之后,在 Consumer 需進(jìn)行解壓,雖然增加了 CPU 的工作,但在對(duì)大數(shù)據(jù)處理上,瓶頸在網(wǎng)絡(luò)上而不是 CPU,所以這個(gè)成本很值得。
高可用機(jī)制
副本
Producer 在發(fā)布消息到某個(gè) Partition 時(shí),先通過(guò) ZooKeeper 找到該 Partition 的 Leader,然后無(wú)論該 Topic 的 Replication Factor 為多少,Producer 只將該消息發(fā)送到該 Partition 的 Leader。Leader 會(huì)將該消息寫(xiě)入其本地 Log。
每個(gè) Follower 都從 Leader pull 數(shù)據(jù)。這種方式上,F(xiàn)ollower 存儲(chǔ)的數(shù)據(jù)順序與 Leader 保持一致。Follower 在收到該消息后,向 Leader 發(fā)送 ACK, 并把消息寫(xiě)入其 Log。一旦 Leader 收到了 ISR 中的所有 Replica 的 ACK,該消息就被認(rèn)為已經(jīng) commit 了,Leader 將增加 HW 并且向 Producer 發(fā)送 ACK。
為了提高性能,每個(gè) Follower 在接收到數(shù)據(jù)后就立馬向 Leader 發(fā)送 ACK,而非等到數(shù)據(jù)寫(xiě)入 Log 中。因此,對(duì)于已經(jīng) commit 的消息,Kafka 只能保證它被存于多個(gè) Replica 的內(nèi)存中,而不能保證它們被持久化到磁盤(pán)中,也就不能完全保證異常發(fā)生后該條消息一定能被 Consumer 消費(fèi)。Consumer 讀消息也是從 Leader 讀取,只有被 commit 過(guò)的消息才會(huì)暴露給 Consumer。Kafka Replication 的數(shù)據(jù)流如下圖所示:

對(duì)于 Kafka 而言,定義一個(gè) Broker 是否“活著”包含兩個(gè)條件:
一是它必須維護(hù)與 ZooKeeper 的 session(這個(gè)通過(guò) ZooKeeper 的 Heartbeat 機(jī)制來(lái)實(shí)現(xiàn))。 二是 Follower 必須能夠及時(shí)將 Leader 的消息復(fù)制過(guò)來(lái),不能“落后太多”。
Leader 會(huì)跟蹤與其保持同步的 Replica 列表,該列表稱為 ISR(即 in-sync Replica)。如果一個(gè) Follower 宕機(jī),或者落后太多,Leader 將把它從 ISR 中移除。這里所描述的“落后太多”指 Follower 復(fù)制的消息落后于 Leader 后的條數(shù)超過(guò)預(yù)定值或者 Follower 超過(guò)一定時(shí)間未向 Leader 發(fā)送 fetch 請(qǐng)求。Kafka 的復(fù)制機(jī)制既不是完全的同步復(fù)制,也不是單純的異步復(fù)制。
完全同步復(fù)制要求所有能工作的 Follower 都復(fù)制完,這條消息才會(huì)被認(rèn)為 commit,這種復(fù)制方式極大的影響了吞吐率(高吞吐率是 Kafka 非常重要的一個(gè)特性)。異步復(fù)制方式下,F(xiàn)ollower 異步的從 Leader 復(fù)制數(shù)據(jù),數(shù)據(jù)只要被 Leader 寫(xiě)入 log 就被認(rèn)為已經(jīng) commit,這種情況下如果 Follower 都復(fù)制完都落后于 Leader,而如果 Leader 突然宕機(jī),則會(huì)丟失數(shù)據(jù)。而 Kafka 的這種使用 ISR 的方式則很好的均衡了確保數(shù)據(jù)不丟失以及吞吐率。Follower 可以批量的從 Leader 復(fù)制數(shù)據(jù),這樣極大的提高復(fù)制性能(批量寫(xiě)磁盤(pán)),極大減少了 Follower 與 Leader 的差距。
一條消息只有被 ISR 里的所有 Follower 都從 Leader 復(fù)制過(guò)去才會(huì)被認(rèn)為已提交。這樣就避免了部分?jǐn)?shù)據(jù)被寫(xiě)進(jìn)了 Leader,還沒(méi)來(lái)得及被任何 Follower 復(fù)制就宕機(jī)了,而造成數(shù)據(jù)丟失(Consumer 無(wú)法消費(fèi)這些數(shù)據(jù))。而對(duì)于 Producer 而言,它可以選擇是否等待消息 commit。這種機(jī)制確保了只要 ISR 有一個(gè)或以上的 Follower,一條被 commit 的消息就不會(huì)丟失。
故障恢復(fù)
Leader 故障
leader 發(fā)生故障后,會(huì)從 ISR 中選出一個(gè)新的 leader,之后,為保證多個(gè)副本之間的數(shù)據(jù)一致性,其余的 follower 會(huì)先將各自的 log 文件高于 HW 的部分截掉,然后從新的 leader 同步數(shù)據(jù)。注意:這只能保證副本之間的數(shù)據(jù)一致性,并不能保證數(shù)據(jù)不丟失或者不重復(fù)。
Kafka 在 ZooKeeper 中動(dòng)態(tài)維護(hù)了一個(gè) ISR(in-sync replicas),這個(gè) ISR 里的所有 Replica 都跟上了 leader,只有 ISR 里的成員才有被選為 Leader 的可能。在這種模式下,對(duì)于 f+1 個(gè) Replica,一個(gè) Partition 能在保證不丟失已經(jīng) commit 的消息的前提下容忍 f 個(gè) Replica 的失敗。

LEO:每個(gè)副本最大的 offset。
HW:消費(fèi)者能見(jiàn)到的最大的 offset,ISR 隊(duì)列中最小的 LEO。
Follower 故障
follower 發(fā)生故障后會(huì)被臨時(shí)踢出 ISR 集合,待該 follower 恢復(fù)后,follower 會(huì) 讀取本地磁盤(pán)記錄的上次的 HW,并將 log 文件高于 HW 的部分截取掉,從 HW 開(kāi)始向 leader 進(jìn)行同步數(shù)據(jù)操作。等該 follower 的 LEO 大于等于該 partition 的 HW,即 follower 追上 leader 后,就可以重新加入 ISR 了。
擴(kuò)展性
由于 Broker 存儲(chǔ)著特定分區(qū)的數(shù)據(jù), 因此,不管是 Broker 還是分區(qū)的擴(kuò)縮容,都是比較復(fù)雜的,屬于典型的“有狀態(tài)服務(wù)”擴(kuò)縮容問(wèn)題。接下來(lái),我們看一下 Pulsar 是怎么針對(duì) kafka 的不足進(jìn)行優(yōu)化的。
四、Pulsar
Apache Pulsar 是 Apache 軟件基金會(huì)頂級(jí)項(xiàng)目,是下一代云原生分布式消息流平臺(tái),集消息、存儲(chǔ)、輕量化函數(shù)式計(jì)算為一體。采用計(jì)算與存儲(chǔ)分離架構(gòu)設(shè)計(jì),支持多租戶、持久化存儲(chǔ)、多機(jī)房跨區(qū)域數(shù)據(jù)復(fù)制,具有強(qiáng)一致性、高吞吐、低延時(shí)及高可擴(kuò)展性等流數(shù)據(jù)存儲(chǔ)特性。在消息領(lǐng)域,Pulsar 是第一個(gè)將存儲(chǔ)計(jì)算分離云原生架構(gòu)落地的開(kāi)源項(xiàng)目。
服務(wù)和存儲(chǔ)分離

在 kafka 的基礎(chǔ)上,把數(shù)據(jù)存儲(chǔ)功能從 Broker 中分離出來(lái),Broker 僅面向生產(chǎn)者、消費(fèi)者提供數(shù)據(jù)讀寫(xiě)能力,但其自身并不存儲(chǔ)數(shù)據(jù)。而在 Broker 層下面使用 Bookie 作為存儲(chǔ)層,承擔(dān)具體的數(shù)據(jù)存儲(chǔ)職責(zé)。在 Pulsar 中,broker 的含義和 kafka 中的 broker 是一致的,就是一個(gè)運(yùn)行的 Pulsar 實(shí)例, 提供多個(gè)分區(qū)的讀寫(xiě)服務(wù)。由于 broker 層不在承擔(dān)數(shù)據(jù)存儲(chǔ)職責(zé),使得 Broker 層成為無(wú)狀態(tài)服務(wù)。這樣一來(lái),Broker 的擴(kuò)縮容就變得非常簡(jiǎn)單。
相比之下,服務(wù)存儲(chǔ)集于一體的 Kafka 就非常難以擴(kuò)容。
Broker 和 Bookie 相互獨(dú)立,方便實(shí)現(xiàn)獨(dú)立的擴(kuò)展以及獨(dú)立的容錯(cuò) Broker 無(wú)狀態(tài),便于快速上、下線,更加適合于云原生場(chǎng)景 分區(qū)存儲(chǔ)不受限于單個(gè)節(jié)點(diǎn)存儲(chǔ)容量 Bookie 數(shù)據(jù)分布均勻

分片存儲(chǔ)

1.在 Kafka 分區(qū)(Partition)概念的基礎(chǔ)上,按照時(shí)間或大小,把分區(qū)切分成分片(Segment)。
2.同一個(gè)分區(qū)的分片,分散存儲(chǔ)在集群中所有的 Bookie 節(jié)點(diǎn)上。
3.同一個(gè)分片,擁有多個(gè)副本,副本數(shù)量可以指定,存儲(chǔ)于不同的 Bookie 節(jié)點(diǎn)。
Pulsar 性能
和 Kafka 一樣,Pulsar 也使用了順序讀寫(xiě)和零拷貝等技術(shù)來(lái)提高系統(tǒng)的性能。
此外,Pulsar 還設(shè)計(jì)了分層緩存機(jī)制,在服務(wù)層和存儲(chǔ)層都做了分層緩存,來(lái)提高性能。
生產(chǎn)者發(fā)送消息時(shí),調(diào)用 Bookie 層寫(xiě)入消息時(shí),同時(shí)將消息寫(xiě)入 broker 緩存中。 實(shí)時(shí)消費(fèi)時(shí)(追尾讀),首先從 broker 緩存中讀取數(shù)據(jù),避免從持久層 bookie 中讀取,從而降低投遞延遲。 讀取歷史消息(追趕讀)場(chǎng)景中,bookie 會(huì)將磁盤(pán)消息讀入 bookie 讀緩存中,從而避免每次都讀取磁盤(pán)數(shù)據(jù),降低讀取延時(shí)。

Pulsar 擴(kuò)展性
分片存儲(chǔ)解決了分區(qū)容量受單節(jié)點(diǎn)存儲(chǔ)空間限制的問(wèn)題,當(dāng)容量不夠時(shí),可以通過(guò)擴(kuò)容 Bookie 節(jié)點(diǎn)的方式支撐更多的分區(qū)數(shù)據(jù),也解決了分區(qū)數(shù)據(jù)傾斜問(wèn)題,數(shù)據(jù)可以均勻的分配在 Bookie 節(jié)點(diǎn)上。
Broker 和 Bookie 靈活的容錯(cuò)以及無(wú)縫的擴(kuò)容能力讓 Apache Pulsar 具備非常高的可用性,實(shí)現(xiàn)了無(wú)限制的分區(qū)存儲(chǔ)。

Broker 擴(kuò)展
在 Pulsar 中 Broker 是無(wú)狀態(tài)的,可以通過(guò)增加節(jié)點(diǎn)的方式實(shí)現(xiàn)快速擴(kuò)容。當(dāng)需要支持更多的消費(fèi)者或生產(chǎn)者時(shí),可以簡(jiǎn)單地添加更多的 Broker 節(jié)點(diǎn)來(lái)滿足業(yè)務(wù)需求。Pulsar 支持自動(dòng)的分區(qū)負(fù)載均衡,在 Broker 節(jié)點(diǎn)的資源使用率達(dá)到閾值時(shí),會(huì)將負(fù)載遷移到負(fù)載較低的 Broker 節(jié)點(diǎn)。新增 Broker 節(jié)點(diǎn)時(shí),分區(qū)也將在 Brokers 中做平衡遷移,一些分區(qū)的所有權(quán)會(huì)轉(zhuǎn)移到新的 Broker 節(jié)點(diǎn)。
Bookie 擴(kuò)展
存儲(chǔ)層的擴(kuò)容,通過(guò)增加 Bookie 節(jié)點(diǎn)來(lái)實(shí)現(xiàn)。通過(guò)資源感知和數(shù)據(jù)放置策略,流量將自動(dòng)切換到新的 Apache Bookie 中,整個(gè)過(guò)程不會(huì)涉及到不必要的數(shù)據(jù)搬遷。即擴(kuò)容時(shí),不會(huì)將舊數(shù)據(jù)從現(xiàn)有存儲(chǔ)節(jié)點(diǎn)重新復(fù)制到新存儲(chǔ)節(jié)點(diǎn)。

如圖所示,起始狀態(tài)有四個(gè)存儲(chǔ)節(jié)點(diǎn),Bookie1, Bookie2, Bookie3, Bookie4,以 Topic1-Part2 為例,當(dāng)這個(gè)分區(qū)的最新的存儲(chǔ)分片是 SegmentX 時(shí),對(duì)存儲(chǔ)層擴(kuò)容,添加了新的 Bookie 節(jié)點(diǎn),BookieX,BookieY,那么當(dāng)存儲(chǔ)分片滾動(dòng)之后,新生成的存儲(chǔ)分片, SegmentX+1,SegmentX+2,會(huì)優(yōu)先選擇新的 Bookie 節(jié)點(diǎn)(BookieX,BookieY)來(lái)保存數(shù)據(jù)。
Pulsar 可用性
Broker 容錯(cuò)
如下圖,假設(shè)當(dāng)存儲(chǔ)分片滾動(dòng)到 SegmentX 時(shí),Broker2 節(jié)點(diǎn)失敗。此時(shí)生產(chǎn)者和消費(fèi)者向其他的 Broker 發(fā)起請(qǐng)求,這個(gè)過(guò)程會(huì)觸發(fā)分區(qū)的所有權(quán)轉(zhuǎn)移,即將 Broker2 擁有的分區(qū) Topic1-Part2 的所有權(quán)轉(zhuǎn)移到其他的 Broker(Broker3)。
由于數(shù)據(jù)存儲(chǔ)和數(shù)據(jù)服務(wù)分離,所以新 Broker 接管分區(qū)的所有權(quán)時(shí),它不需要復(fù)制 Partiton 的數(shù)據(jù)。新的分區(qū) Owner(Broker3)會(huì)產(chǎn)生一個(gè)新的分片 SegmentX+1, 如果有新數(shù)據(jù)到來(lái),會(huì)存儲(chǔ)在新的分片 Segment x+1 上,不會(huì)影響分區(qū)的可用性。
即當(dāng)某個(gè) Broker 實(shí)例故障時(shí),整個(gè)集群的消息存儲(chǔ)能力仍然完好。此時(shí),集群只是喪失了特定分區(qū)的消息服務(wù),只需要把這些分區(qū)的服務(wù)權(quán)限分配給其他 Broker 即可。
注意,和 Kafka 一樣, Pulsar 的一個(gè)分區(qū)仍然只能由一個(gè) Broker 提供服務(wù),否則無(wú)法保證消息的分區(qū)有序性。

Bookie 容錯(cuò)
如下圖,假設(shè) Bookie 2 上的 Segment 4 損壞。Bookie Auditor 會(huì)檢測(cè)到這個(gè)錯(cuò)誤并進(jìn)行復(fù)制修復(fù)。Bookie 中的副本修復(fù)是 Segment 級(jí)別的多對(duì)多快速修復(fù),BookKeeper 可以從 Bookie 3 和 Bookie 4 讀取 Segment 4 中的消息,并在 Bookie 1 處修復(fù) Segment 4。如果是 Bookie 節(jié)點(diǎn)故障,這個(gè) Bookie 節(jié)點(diǎn)上所有的 Segment 會(huì)按照上述方式復(fù)制到其他的 Bookie 節(jié)點(diǎn)。
所有的副本修復(fù)都在后臺(tái)進(jìn)行,對(duì) Broker 和應(yīng)用透明,Broker 會(huì)產(chǎn)生新的 Segment 來(lái)處理寫(xiě)入請(qǐng)求,不會(huì)影響分區(qū)的可用性。

Pulsar 其他特性
基于上述的設(shè)計(jì)特點(diǎn),Pulsar 提供了很多特性。
讀寫(xiě)分離
Pulsar 另外一個(gè)有吸引力的特性是提供了讀寫(xiě)分離的能力,讀寫(xiě)分離保證了在有大量滯后消費(fèi)(磁盤(pán) IO 會(huì)增加)時(shí),不會(huì)影響服務(wù)的正常運(yùn)行,尤其是不會(huì)影響到數(shù)據(jù)的寫(xiě)入。讀寫(xiě)分離的能力由 Bookie 提供,簡(jiǎn)單說(shuō)一下 Bookie 存儲(chǔ)涉及到的概念:
Journals:Journal 文件包含了 Bookie 事務(wù)日志,在 Ledger (可以認(rèn)為是分片的一部分) 更新之前,Journal 保證描述更新的事務(wù)寫(xiě)入到 Non-volatile 的存儲(chǔ)介質(zhì)上; Entry logger:Entry 日志文件管理寫(xiě)入的 Entry,來(lái)自不同 ledger 的 entry 會(huì)被聚合然后順序?qū)懭耄?/section> Index files:每個(gè) Ledger 都有一個(gè)對(duì)應(yīng)的索引文件,記錄數(shù)據(jù)在 Entry 日志文件中的 Offset 信息。
Entry 的讀寫(xiě)入過(guò)程下圖所示,數(shù)據(jù)的寫(xiě)入流程:
數(shù)據(jù)首先會(huì)寫(xiě)入 Journal,寫(xiě)入 Journal 的數(shù)據(jù)會(huì)實(shí)時(shí)落到磁盤(pán); 然后,數(shù)據(jù)寫(xiě)入到 Memtable ,Memtable 是讀寫(xiě)緩存; 寫(xiě)入 Memtable 之后,對(duì)寫(xiě)入請(qǐng)求進(jìn)行響應(yīng); Memtable 寫(xiě)滿之后,會(huì) Flush 到 Entry Logger 和 Index cache,Entry Logger 中保存了數(shù)據(jù),Index cache 保存了數(shù)據(jù)的索引信息,然后由后臺(tái)線程將 Entry Logger 和 Index cache 數(shù)據(jù)落到磁盤(pán)。
數(shù)據(jù)的讀取流程:
如果是 Tailing read 請(qǐng)求,直接從 Memtable 中讀取 Entry; 如果是 Catch-up read(滯后消費(fèi))請(qǐng)求,先讀取 Index 信息,然后索引從 Entry Logger 文件讀取 Entry。

一般在進(jìn)行 Bookie 的配置時(shí),會(huì)將 Journal 和 Ledger 存儲(chǔ)磁盤(pán)進(jìn)行隔離,減少 Ledger 對(duì)于 Journal 寫(xiě)入的影響,并且推薦 Journal 使用性能較好的 SSD 磁盤(pán),讀寫(xiě)分離主要體現(xiàn)在:
寫(xiě)入 Entry 時(shí),Journal 中的數(shù)據(jù)需要實(shí)時(shí)寫(xiě)到磁盤(pán),Ledger 的數(shù)據(jù)不需要實(shí)時(shí)落盤(pán),通過(guò)后臺(tái)線程批量落盤(pán),因此寫(xiě)入的性能主要受到 Journal 磁盤(pán)的影響; 讀取 Entry 時(shí),首先從 Memtable 讀取,命中則返回;如果不命中,再?gòu)?Ledger 磁盤(pán)中讀取,所以對(duì)于 Catch-up read 的場(chǎng)景,讀取數(shù)據(jù)會(huì)影響 Ledger 磁盤(pán)的 IO,對(duì) Journal 磁盤(pán)沒(méi)有影響,也就不會(huì)影響到數(shù)據(jù)的寫(xiě)入。
所以,數(shù)據(jù)寫(xiě)入是主要是受 Journal 磁盤(pán)的負(fù)載影響,不會(huì)受 Ledger 磁盤(pán)的影響。另外,Segment 存儲(chǔ)的多個(gè)副本都可以提供讀取服務(wù),相比于主從副本的設(shè)計(jì),Apache Pulsar 可以提供更好的數(shù)據(jù)讀取能力。
通過(guò)以上分析,Apache Pulsar 使用 Apache BookKeeper 作為數(shù)據(jù)存儲(chǔ),可以帶來(lái)下列的收益:
支持將多個(gè) Ledger 的數(shù)據(jù)寫(xiě)入到同一個(gè) Entry logger 文件,可以避免分區(qū)膨脹帶來(lái)的性能下降問(wèn)題 支持讀寫(xiě)分離,可以在滯后消費(fèi)場(chǎng)景導(dǎo)致磁盤(pán) IO 上升時(shí),保證數(shù)據(jù)寫(xiě)入的不受影響 支持全副本讀取,可以充分利用存儲(chǔ)副本的數(shù)據(jù)讀取能力
多種消費(fèi)模型
Pulsar 提供了多種訂閱方式來(lái)消費(fèi)消息,分為三種類型:獨(dú)占(Exclusive),故障切換(Failover)或共享(Share)。
Exclusive 獨(dú)占訂閱 :在任何時(shí)間,一個(gè)消費(fèi)者組(訂閱)中有且只有一個(gè)消費(fèi)者來(lái)消費(fèi) Topic 中的消息。 Failover 故障切換:多個(gè)消費(fèi)者(Consumer)可以附加到同一訂閱。但是,一個(gè)訂閱中的所有消費(fèi)者,只會(huì)有一個(gè)消費(fèi)者被選為該訂閱的主消費(fèi)者。其他消費(fèi)者將被指定為故障轉(zhuǎn)移消費(fèi)者。當(dāng)主消費(fèi)者斷開(kāi)連接時(shí),分區(qū)將被重新分配給其中一個(gè)故障轉(zhuǎn)移消費(fèi)者,而新分配的消費(fèi)者將成為新的主消費(fèi)者。發(fā)生這種情況時(shí),所有未確認(rèn)(ack)的消息都將傳遞給新的主消費(fèi)者。 Share 共享訂閱:使用共享訂閱,在同一個(gè)訂閱背后,用戶按照應(yīng)用的需求掛載任意多的消費(fèi)者。訂閱中的所有消息以循環(huán)分發(fā)形式發(fā)送給訂閱背后的多個(gè)消費(fèi)者,并且一個(gè)消息僅傳遞給一個(gè)消費(fèi)者。
當(dāng)消費(fèi)者斷開(kāi)連接時(shí),所有傳遞給它但是未被確認(rèn)(ack)的消息將被重新分配和組織,以便發(fā)送給該訂閱上剩余的剩余消費(fèi)者。

多種 ACK 模型
消息確認(rèn)(ACK)的目的就是保證當(dāng)發(fā)生故障后,消費(fèi)者能夠從上一次停止的地方恢復(fù)消費(fèi),保證既不會(huì)丟失消息,也不會(huì)重復(fù)處理已經(jīng)確認(rèn)(ACK)的消息。在 Pulsar 中,每個(gè)訂閱中都使用一個(gè)專門(mén)的數(shù)據(jù)結(jié)構(gòu)–游標(biāo)(Cursor)來(lái)跟蹤訂閱中的每條消息的確認(rèn)(ACK)狀態(tài)。每當(dāng)消費(fèi)者在分區(qū)上確認(rèn)消息時(shí),游標(biāo)都會(huì)更新。
Pulsar 提供兩種消息確認(rèn)方法:
單條確認(rèn)(Individual Ack),單獨(dú)確認(rèn)一條消息。被確認(rèn)后的消息將不會(huì)被重新傳遞 累積確認(rèn)(Cumulative Ack),通過(guò)累積確認(rèn),消費(fèi)者只需要確認(rèn)它收到的最后一條消息

上圖說(shuō)明了單條確認(rèn)和累積確認(rèn)的差異(灰色框中的消息被確認(rèn)并且不會(huì)被重新傳遞)。對(duì)于累計(jì)確認(rèn),M12 之前的消息被標(biāo)記為 Acked。對(duì)于單獨(dú)進(jìn)行 ACK,僅確認(rèn)消息 M7 和 M12, 在消費(fèi)者失敗的情況下,除了 M7 和 M12 之外,其他所有消息將被重新傳送。
— 本文結(jié)束 —

●?漫談設(shè)計(jì)模式在 Spring 框架中的良好實(shí)踐
關(guān)注我,回復(fù) 「加群」 加入各種主題討論群。
對(duì)「服務(wù)端思維」有期待,請(qǐng)?jiān)谖哪c(diǎn)個(gè)在看
喜歡這篇文章,歡迎轉(zhuǎn)發(fā)、分享朋友圈


