淺談RabbitMQ的基石—高級消息隊(duì)列協(xié)議(AMQP)
點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)”

?
? 前言
自從去年做了不少流式系統(tǒng)(Flink也好,Spark Streaming也好)對接RabbitMQ的實(shí)時作業(yè)。之前一直都在Kafka的領(lǐng)域里摸爬滾打,對RabbitMQ只是有淺薄的了解而已。隨著自己逐漸把RabbitMQ的官方文檔大致翻完,了解到它是高級消息隊(duì)列協(xié)議(Advanced Message Queuing Protocol, AMQP)的一種標(biāo)準(zhǔn)實(shí)現(xiàn)。也就是說,搞清楚AMQP是掌握好RabbitMQ哲學(xué)的基礎(chǔ)。
當(dāng)前AMQP的最新版本為1.0,而主要使用的(也是RabbitMQ實(shí)現(xiàn)的)版本為0-9-1。這兩個版本之間的差別非常大,本文抄錄的是AMQP 0-9-1的部分細(xì)節(jié)。
AMQP及其模型
通俗地講,AMQP是一個專門為消息中間件設(shè)計(jì)的、開放標(biāo)準(zhǔn)的應(yīng)用層協(xié)議,它規(guī)定了消息系統(tǒng)中三大組件——消息服務(wù)器/代理節(jié)點(diǎn)(server/broker)、生產(chǎn)者/發(fā)布者(producer/publisher)、消費(fèi)者/訂閱者(consumer/subscriber)之間的通信規(guī)范,以及代理節(jié)點(diǎn)的設(shè)計(jì)規(guī)范等。
AMQP采用的模型就叫做高級消息隊(duì)列模型,即AMQ模型,它的組成可以用下面的簡圖來表示。
下面就圖中出現(xiàn)的一些名詞進(jìn)行解釋。
交換器(exchange):負(fù)責(zé)將生產(chǎn)者發(fā)來的消息按照特定的路由關(guān)鍵字(routing key)投遞到相應(yīng)的隊(duì)列。
隊(duì)列(queue):代理節(jié)點(diǎn)中存儲將要被消費(fèi)的消息的載體。
綁定(binding):交換器與隊(duì)列之間的映射關(guān)系,可以理解為消息的路由規(guī)則。
AMQP實(shí)體(AMQP entity):交換器、隊(duì)列和綁定三者合起來就稱為一個AMQP實(shí)體,圖中未示出。交換器、隊(duì)列和綁定都可以有一個或多個。
虛擬主機(jī)(virtual host):在代理節(jié)點(diǎn)上邏輯劃分的隔離的環(huán)境,其內(nèi)部包含一個或多個AMQP實(shí)體,且虛擬主機(jī)之間互不影響。虛擬主機(jī)可以復(fù)用節(jié)點(diǎn),并實(shí)現(xiàn)權(quán)限管理和多租戶。
連接(connection):發(fā)布者、消費(fèi)者與代理節(jié)點(diǎn)之間建立的連接,為了保證可靠性,一般都是TCP長連接。
通道(channel):對連接的輕量級復(fù)用,主要針對多線程的發(fā)布者、消費(fèi)者,因?yàn)榻⒍鄠€TCP連接是很貴的操作,頻繁建立和銷毀連接也是不科學(xué)的。
接下來對交換器和隊(duì)列這兩個比較重要的組件進(jìn)行介紹,順便牽出一些其他的東西。
交換器
交換器在AMQP實(shí)體中負(fù)責(zé)消息路由。它的路由目的地除了由用戶設(shè)置的綁定規(guī)則來決定之外,還與交換器的類型有關(guān)。AMQP定義了幾種默認(rèn)的交換器。
直連交換器(direct exchange)
直連交換器非常簡單,它檢查綁定關(guān)鍵字(binding key)與路由關(guān)鍵字(routing key),只要兩者相同,即進(jìn)行投遞。

扇出交換器(fanout exchange)
扇出交換器比直連交換器更簡單,它會直接將消息路由到所有與它綁定的隊(duì)列中。

主題交換器(topic exchange) 此主題非彼(對就是Kafka里的)主題,而更類似wildcard matching。具體來講,綁定關(guān)鍵字是由多個域組成的點(diǎn)號分隔的字符串,每個域可以是實(shí)際的單詞,也可以是通配符,如星號 " * " 表示一個詞,"#" 表示0個或多個詞。在實(shí)際路由時,根據(jù)路由關(guān)鍵字與綁定關(guān)鍵字的匹配結(jié)果來投遞。比如在下圖中,帶有"little.C.magic"關(guān)鍵字的消息會投遞到隊(duì)列1,而帶有"bla.bla.B"關(guān)鍵字的消息會投遞到隊(duì)列2。

頭部交換器(header exchange) AMQP消息與HTTP報(bào)文的格式類似,都有頭部(header)和消息體(body),其中頭部會保存與消息相關(guān)的許多元數(shù)據(jù),消息體才是有效的載荷(payload)。頭部交換器就不依賴綁定關(guān)鍵字和路由關(guān)鍵字的匹配,而是檢查消息頭部中的元數(shù)據(jù)是否匹配,相對而言更加靈活。
根據(jù)AMQP的規(guī)定,交換器的幾個重要屬性有:
名稱(name);
持久性(durable):當(dāng)代理節(jié)點(diǎn)或虛擬主機(jī)重置后,交換器是被保留還是被刪除;
自動刪除(auto-delete):是否在所有隊(duì)列的綁定解除之后被刪除;
擴(kuò)展參數(shù)(arguments)。
如果交換器無法將消息路由到隊(duì)列該怎么辦呢?AMQP給出了幾種解決方法,一是直接丟棄,二是返還給生產(chǎn)者,三是放入死信隊(duì)列中等待進(jìn)一步處理。這由消息頭部中的屬性來決定。
隊(duì)列和消息
隊(duì)列相對而言比較簡單,它的主要功能就是存儲要被消費(fèi)的消息。隊(duì)列也有一些重要的屬性,如下:
名稱(name);
持久性(durable):當(dāng)代理節(jié)點(diǎn)或虛擬主機(jī)重置后,隊(duì)列是被保留還是被刪除;
獨(dú)占性(exclusive):是否只允許被一個連接使用;
自動刪除(auto-delete):是否在所有消費(fèi)者取消訂閱之后被刪除;
擴(kuò)展參數(shù)(arguments):如隊(duì)列緩存長度、消息TTL等。
需要注意,如果一個隊(duì)列是持久的,那么只是代表重啟之后這個隊(duì)列不用重新創(chuàng)建而已,但其中的消息還是有可能被刪除。只有那些被標(biāo)記為persistent的消息才不會被刪除。
AMQP規(guī)范下的隊(duì)列和消費(fèi)者都同時支持推模式和拉模式消費(fèi)。前者即AMQP實(shí)體將消息投遞到消費(fèi)者,后者即消費(fèi)者主動地從隊(duì)列中獲取消息。無論推模式還是拉模式,每個消費(fèi)者也有一個標(biāo)識,稱為tag。
在隊(duì)列中的消息投遞出去之后,消費(fèi)者需要告訴代理節(jié)點(diǎn)自己是否收到了它,因此會涉及消息確認(rèn)(ack)的問題。AMQP默認(rèn)定義了兩種ack機(jī)制:
自動ack:當(dāng)消息從隊(duì)列中出去后就刪除它(即at most once);
顯式ack:當(dāng)消費(fèi)者發(fā)送的確認(rèn)回執(zhí)到達(dá)代理節(jié)點(diǎn)后,再從隊(duì)列中刪除它。如果ack超時,則會再次嘗試投遞(即at least once)。
除了ack之外,消費(fèi)者在處理時有可能會出現(xiàn)問題,或認(rèn)為此消息非法,因此也會出現(xiàn)拒絕消息(reject)的情況。此時代理節(jié)點(diǎn)可以銷毀這條消息,也可以重新將它放入隊(duì)列并投遞給另一個消費(fèi)者。
vs Kafka?
說了這么多,那么Kafka和AMQP有什么關(guān)系呢?答案是沒關(guān)系。
也就是說,Kafka不是消息隊(duì)列。按官方說法,Kafka是一個流式處理平臺(stream processing platform)。Kafka在設(shè)計(jì)之初是為了支持高吞吐量的日志處理的,只不過它恰好也可以實(shí)現(xiàn)消息隊(duì)列的大部分功能而已。Kafka所用的“黑科技”(如零拷貝/內(nèi)存映射,以及對page cache的利用)都是脫離標(biāo)準(zhǔn)消息隊(duì)列的設(shè)計(jì)范疇的,所以不能簡單地認(rèn)為Kafka比RabbitMQ等符合AMQP的消息隊(duì)列更優(yōu)。例如,RabbitMQ支持死信隊(duì)列、延遲隊(duì)列、優(yōu)先隊(duì)列、多租戶、推模式消費(fèi)等,Kafka統(tǒng)統(tǒng)不支持。

版權(quán)聲明:
文章不錯?點(diǎn)個【在看】吧!??




