10分鐘掌握RocketMQ的核心知識
Spring Boot 作為主流微服務框架,擁有成熟的社區(qū)生態(tài)。市場應用廣泛,為了方便大家,整理了一個基于spring boot的常用中間件快速集成入門系列手冊,涉及RPC、緩存、消息隊列、分庫分表、注冊中心、分布式配置等常用開源組件,大概有幾十篇文章,陸續(xù)會開放出來,感興趣同學請?zhí)崆瓣P注&收藏
前言
Apache RocketMQ 是阿里開源的一款高性能、高吞吐量的分布式消息中間件。
RocketMQ主要由 Producer、Broker、Consumer 三部分組成,其中Producer 負責生產(chǎn)消息,Consumer 負責消費消息,Broker 負責存儲消息。每個 Broker 可以存儲多個Topic的消息,每個Topic的消息也可以分片存儲于集群中的不同的Broker Group。
快速安裝:https://rocketmq.apache.org/docs/quick-start/
源代碼:https://github.com/apache/rocketmq-spring
主要功能:
1、業(yè)務解耦。采用發(fā)布訂閱模式,生產(chǎn)端發(fā)送消息到MQ Server,下游的消費端訂閱接收消息。異步形式,系統(tǒng)解耦,提升系統(tǒng)擴展性
2、削峰限流。由于消息中間件的吞吐量很高,過量的請求會暫時放在 MQ server,下游慢慢消費,避免過量請求沖垮系統(tǒng)
3、億級消息的堆積能力,單個隊列中的百萬級消息的累積容量。
4、高可用性:Broker服務器支持多Master多Slave的同步雙寫以及Master多Slave的異步復制模式,其中同步雙寫可保證消息不丟失。
5、高可靠性:生產(chǎn)者將消息發(fā)送到Broker端有三種方式,同步、異步和單向。Broker在對于消息刷盤有兩種策略:同步刷盤和異步刷盤,其中同步刷盤可以保證消息成功的存儲到磁盤中。消費者的消費模式也有集群消費和廣播消費兩種,默認集群消費,如果集群模式中消費者掛了,一個組里的其他消費者會接替其消費。
6、分布式事務消息:這里是采用半消息確認和消息回查機制來保證分布式事務消息。
7、支持消息過濾:建議采用消費者業(yè)務端的tag過濾
8、支持順序消息:消息在Broker中是采用隊列的FIFO模式存儲的,也就是發(fā)送是順序的,只要保證消費的順序性即可。
9、支持定時消息和延遲消息:Broker中由定時消息的機制,消息發(fā)送到Broker中,不會立即被Consumer消費,會等到一定的時間才被消費。延遲消息也是一樣,延遲一定時間之后才會被Consumer消費。
核心組件:
1、Namesrv
Namesrv充當路由消息的提供者。Namesrv是一個幾乎無狀態(tài)節(jié)點,多個Namesrv實例組成集群,但相互獨立,沒有信息交換。Namesrv主要作用是:為producer和consumer提供關于topic的路由信息。管理broker節(jié)點:監(jiān)控更新broker的實時狀態(tài)。路由注冊、路由刪除(故障剔除)。
2、Broker
負責存儲消息、轉(zhuǎn)發(fā)消息。Broker是以group為單位提供服務。一個group里面分Master和Slave。Master和Slave存儲的數(shù)據(jù)一樣,slave從master同步數(shù)據(jù)(同步雙寫或異步復制看配置)。一個Master可以對應多個Slave,一個Slave只能對應一個Master。Master與Slave的對應關系通過指定相同的BrokerName、不同的BrokerId來定義,BrokerId為0表示Master,非0表示Slave。
基本概念:
1、主題(Topic) 表示一類消息的集合,每個主題包含若干條消息,每條消息只能屬于一個主題,是RocketMQ進行消息訂閱的基本單位。每個topic可分為若干個分區(qū)(queue)
2、生產(chǎn)者組(Producer Group) 同一類Producer的集合,這類Producer發(fā)送同一類消息且發(fā)送邏輯一致。如果發(fā)送的是事務消息且原始生產(chǎn)者在發(fā)送之后崩潰,則Broker服務器會聯(lián)系同一生產(chǎn)者組的其他生產(chǎn)者實例以提交或回溯消費。
3、消費者組(Consumer Group) 同一類Consumer的集合,這類Consumer通常消費同一類消息且消費邏輯一致。消費者組使得在消息消費方面,實現(xiàn)負載均衡和容錯的目標變得非常容易。要注意的是,消費者組的消費者實例必須訂閱完全相同的Topic。RocketMQ 支持兩種消息模式:集群消費(Clustering)和廣播消費(Broadcasting)。
4、普通順序消息(Normal Ordered Message) 普通順序消費模式下,消費者通過同一個消費隊列收到的消息是有順序的,不同消息隊列收到的消息則可能是無順序的。
5、嚴格順序消息(Strictly Ordered Message) 嚴格順序消息模式下,消費者收到的所有消息均是有序的。
6、消息(Message) 消息系統(tǒng)所傳輸信息的物理載體,生產(chǎn)和消費數(shù)據(jù)的最小單位,每條消息必須屬于一個主題。RocketMQ中每個消息擁有唯一的Message ID,且可以攜帶具有業(yè)務標識的Key。系統(tǒng)提供了通過Message ID和Key查詢消息的功能。
7、標簽(Tag) 為消息設置的標志,用于同一主題下區(qū)分不同類型的消息。來自同一業(yè)務單元的消息,可以根據(jù)不同業(yè)務目的在同一主題下設置不同標簽。標簽能夠有效地保持代碼的清晰度和連貫性,并優(yōu)化RocketMQ提供的查詢系統(tǒng)。消費者可以根據(jù)Tag實現(xiàn)對不同子主題的不同消費邏輯,實現(xiàn)更好的擴展性。
RocketMQ 特性:
- 同步發(fā)送
- 異步發(fā)送
- 單向方式發(fā)送
- 發(fā)送有序消息
- 發(fā)送批量消息
- 發(fā)送事務信息
- 發(fā)送延遲消息
- 消費有序消息
- 使用標簽或sql92表達式過濾消息
- 支持消息跟蹤
- 支持身份驗證和授權
- 支持請求-回復消息交換模式
- 消費消息支持推、拉模式
代碼演示
外部依賴:
spring boot 已經(jīng)為RocketMQ 封裝了starter組件,只需在 pom.xml 文件中添加jar版本依賴即可:
<dependency>
????<groupId>org.apache.rocketmq</groupId>
????<artifactId>rocketmq-spring-boot-starter</artifactId>
????<version>2.0.3</version>
</dependency>
配置文件:
在配置文件 application.yaml 中配置 RocketMQ 的相關參數(shù),具體內(nèi)容如下:
rocketmq:
??name-server:?localhost:9876
??consumer:
????topic:?maker-order-topic
????group:?my-group1
??producer:
????group:?p-my-group1
消息生產(chǎn)端:
@Resource
private?RocketMQTemplate?rocketMQTemplate;
private?static?String?makerOrderTopic?=?"maker-order-topic";
@GetMapping("/send_make_order_message")
public?Object?send_make_order_message()?{
????try?{
????????Long?orderId?=?Long.valueOf(new?Random().nextInt(1000000));
????????OrderModel?orderModel?=?OrderModel.builder().orderId(orderId).buyerUid(200000L).amount(26.8).shippingAddress("上海").build();
????????SendResult?sendResult?=?rocketMQTemplate.syncSend(makerOrderTopic,?orderModel);
????????System.out.printf("Send?message?to?topic?%s?,?sendResult=%s?%n",?makerOrderTopic,?sendResult);
????????return?"消息發(fā)送成功";
????}?catch?(Exception?e)?{
????????e.printStackTrace();
????????return?"消息發(fā)送失敗";
????}
}
消息消費端:
@Service
@RocketMQMessageListener(nameServer?=?"${rocketmq.name-server}",?topic?=?"${rocketmq.consumer.topic}",?consumerGroup?=?"${rocketmq.consumer.group}")
public?class?OrderConsumer?implements?RocketMQListener<OrderModel>?{
????@Override
????public?void?onMessage(OrderModel?orderModel)?{
????????System.out.printf("consumer?received?message:?%s?\n",?JSON.toJSONString(orderModel));
????}
}
操作演示
瀏覽器訪問:http://localhost:9071/send_make_order_message,模擬生產(chǎn)端發(fā)送消息到MQ Server中。
消費端接收消息日志:
Send?message?to?topic?maker-order-topic?,?sendResult=SendResult?[sendStatus=SEND_OK,?msgId=C0A80069816F14DAD5DC73A75B9F0014,?offsetMsgId=C0A8006900002A9F0000000000058841,?messageQueue=MessageQueue?[topic=maker-order-topic,?brokerName=192.168.0.105,?queueId=2],?queueOffset=0]?
consumer?received?message:?{"amount":26.8,"buyerUid":200000,"orderId":895586,"shippingAddress":"上海"}?
其他消息類型如何發(fā)送
1、同步發(fā)送
同步發(fā)送是指消息發(fā)送方發(fā)出一條消息后,在收到服務端返回響應后,線程才會執(zhí)行后續(xù)代碼
OrderModel?orderModel?=?mockOrderModel();
Message?message?=?new?Message(makerOrderTopic,?"TageA",?JSON.toJSONString(orderModel).getBytes());
SendResult?sendResult?=?rocketMQTemplate.getProducer().send(message);
2、異步發(fā)送
異步發(fā)送是指發(fā)送方發(fā)出一條消息后,不需要等服務端返回響應。異步發(fā)送,需要實現(xiàn)異步發(fā)送回調(diào)接口(SendCallback),通過回調(diào)接口接收服務端響應,并處理結果
OrderModel?orderModel?=?mockOrderModel();
rocketMQTemplate.asyncSend(makerOrderTopic,?orderModel,?new?SendCallback()?{
????@Override
????public?void?onSuccess(SendResult?sendResult)?{
????????System.out.println("消息發(fā)送成功,msgId="?+?sendResult.getMsgId());
????}
????@Override
????public?void?onException(Throwable?throwable)?{
????????System.out.println("發(fā)送失敗,"?+?throwable);
????}
});
3、順序消息
對于指定的一個Topic,所有消息根據(jù)Sharding Key分區(qū)。同一個分區(qū)內(nèi)的消息按照嚴格的FIFO順序進行發(fā)布和消費。Sharding Key是順序消息中用來區(qū)分不同分區(qū)的關鍵字段,和普通消息的Key是完全不同的概念。
比如:電商的訂單創(chuàng)建,以訂單ID作為Sharding Key,那么同一個訂單相關的消息,如創(chuàng)建訂單、付款、發(fā)貨、訂單退款消息、訂單物流消息都會按照發(fā)布的先后順序來消費。
for?(long?orderId?=?0;?orderId?<?20;?orderId++)?{
????String?shardingKey?=?String.valueOf(orderId?%?5);
????OrderModel?orderModel?=?OrderModel.builder().orderId(orderId).build();
????SendResult?sendResult?=?rocketMQTemplate.syncSendOrderly(makerOrderTopic,?orderModel,?shardingKey);
????if?(sendResult?!=?null)?{
????????System.out.println(orderId?+?"?,發(fā)送成功");
????}
}
4、延時消息
Producer將消息發(fā)送到消息隊列RocketMQ服務端,但并不期望立馬投遞這條消息,而是延遲一定時間后才投遞到Consumer進行消費,該消息稱為延時消息。
OrderModel?orderModel?=?mockOrderModel();
org.springframework.messaging.Message?message?=?MessageBuilder.withPayload(JSON.toJSONString(orderModel).getBytes()).build();
//延時等級?3,?這個消息將在10s之后發(fā)送,現(xiàn)在只支持固定的幾個時間值
//delayTimeLevel?=?"1s?5s?10s?30s?1m?2m?3m?4m?5m?6m?7m?8m?9m?10m?20m?30m?1h?2h";
SendResult?sendResult?=?rocketMQTemplate.syncSend(makerOrderTopic,?message,?8000,?3);
5、事務消息
RocketMQ提供類似X/Open XA的分布式事務功能,通過消息隊列RocketMQ事務消息能達到分布式事務的最終一致。
由于網(wǎng)絡閃斷、生產(chǎn)者應用重啟等原因,導致某條事務消息的二次確認丟失,消息隊列RocketMQ服務端通過掃描發(fā)現(xiàn)某條消息長期處于“半事務消息”時,主動向生產(chǎn)者查詢該消息的最終狀態(tài)(Commit或Rollback),該過程稱之為消息回查。
典型場景:在電商購物車下單時,涉及到購物車系統(tǒng)和交易系統(tǒng),這兩個系統(tǒng)之間的數(shù)據(jù)最終一致性可以通過分布式事務消息的異步處理實現(xiàn)。在這種場景下,交易系統(tǒng)是最為核心的系統(tǒng),需要最大限度地保證下單成功。而購物車系統(tǒng)只需要訂閱消息隊列RocketMQ的交易訂單消息,做相應的業(yè)務處理,即可保證最終的數(shù)據(jù)一致性。
發(fā)送步驟:
- 發(fā)送方將半事務消息發(fā)送至MQ Server。
- MQ服務端將消息持久化成功之后,向發(fā)送方返回Ack確認消息已經(jīng)發(fā)送成功,此時消息為半事務消息。
- 發(fā)送方開始執(zhí)行本地事務邏輯
- 發(fā)送方根據(jù)本地事務執(zhí)行結果向服務端提交二次確認(Commit或Rollback),服務端收到
Commit狀態(tài)則將半事務消息標記為可投遞,訂閱方將收到該消息;服務端收到Rollback狀態(tài)則刪除半事務消息,訂閱方不會收到該消息。
回查步驟:
- 在斷網(wǎng)或者應用重啟的特殊情況下,上述步驟4提交的二次確認最終未到達服務端,經(jīng)過固定時間后服務端將對該消息發(fā)起消息回查。
- 發(fā)送方收到消息回查后,需要檢查對應消息的本地事務執(zhí)行的最終結果。
- 發(fā)送方根據(jù)檢查得到的本地事務的最終狀態(tài)再次提交二次確認,服務端仍按照步驟4對半事務消息進行操作。
發(fā)送半事務消息,示例代碼如下:
OrderModel?orderModel?=?mockOrderModel();
org.springframework.messaging.Message?message?=?MessageBuilder.withPayload(JSON.toJSONString(orderModel)).build();
TransactionSendResult?transactionSendResult?=?rocketMQTemplate.sendMessageInTransaction("tx_order_message",?makerOrderTopic,?message,?null);
SendStatus?sendStatus?=?transactionSendResult.getSendStatus();
LocalTransactionState?localTransactionState?=?transactionSendResult.getLocalTransactionState();
System.out.println("send message status:?"?+?sendStatus?+?" , localTransactionState:?"?+?localTransactionState);
編寫RocketMQLocalTransactionListener接口實現(xiàn)類,實現(xiàn)執(zhí)行本地事務和事務回查兩個方法。
@Component
@RocketMQTransactionListener(txProducerGroup?=?"tx_order_message")
public?class?TXProducerListener?implements?RocketMQLocalTransactionListener?{
????@Override
????public?RocketMQLocalTransactionState?executeLocalTransaction(Message?message,?Object?arg)?{
????????//?執(zhí)行本地事務
????????System.out.println("TXProducerListener 開始執(zhí)行本地事務。。。");
????????RocketMQLocalTransactionState?result;
????????try?{
????????????//?模擬業(yè)務處理(?如:創(chuàng)建訂單?)
????????????//?int?i?=?1?/?0;??//模擬異常
????????????result?=?RocketMQLocalTransactionState.COMMIT;??//?成功
????????}?catch?(Exception?e)?{
????????????System.out.println("本地事務執(zhí)行失敗。。。");
????????????result?=?RocketMQLocalTransactionState.ROLLBACK;
????????}
????????return?result;
????}
????@Override
????public?RocketMQLocalTransactionState?checkLocalTransaction(Message?msg)?{
????????//?檢查本地事務(?例如檢查下訂單是否成功?)
????????System.out.println("檢查本地事務。。。");
????????RocketMQLocalTransactionState?result;
????????try?{
????????????//模擬業(yè)務處理(?根據(jù)檢查結果,決定是COMMIT或ROLLBACK?)
????????????result?=?RocketMQLocalTransactionState.COMMIT;
????????}?catch?(Exception?e)?{
????????????//?異常就回滾
????????????System.out.println("檢查本地事務?error");
????????????result?=?RocketMQLocalTransactionState.ROLLBACK;
????????}
????????return?result;
????}
}
演示代碼地址
https://github.com/aalansehaiyang/spring-boot-bulking??
模塊:spring-boot-bulking-rocketmq
面試官一般喜歡考察哪些知識點
1、如何保證順序消息?
順序由producer發(fā)送到broker的消息隊列是滿足FIFO的,所以發(fā)送是順序的,單個queue里的消息是順序的。多個Queue同時消費是無法絕對保證消息的有序性的。所以,同一個topic,同一個queue,發(fā)消息的時候一個線程發(fā)送消息,消費的時候一個線程去消費一個queue里的消息。
2、怎么保證消息發(fā)到同一個queue里?
RocketMQ給我們提供了MessageQueueSelector接口,可以重寫里面的接口,實現(xiàn)自己的算法,比如判斷i%2==0,那就發(fā)送消息到queue1否則發(fā)送到queue2。
3、如何實現(xiàn)消息過濾?
有兩種方案,一種是在broker端按照Consumer的去重邏輯進行過濾,這樣做的好處是避免了無用的消息傳輸?shù)紺onsumer端,缺點是加重了Broker的負擔,實現(xiàn)起來相對復雜。另一種是在Consumer端過濾,比如按照消息設置的tag去重,這樣的好處是實現(xiàn)起來簡單,缺點是有大量無用的消息到達了Consumer端只能丟棄不處理。
4、如果由于網(wǎng)絡等原因,多條重復消息投遞到了Consumer端,你怎么進行消息去重?
這個得先說下消息的冪等性原則:就是用戶對于同一種操作發(fā)起的多次請求的結果是一樣的,不會因為操作了多次就產(chǎn)生不一樣的結果。只要保持冪等性,不管來多少條消息,最后處理結果都一樣,需要Consumer端自行實現(xiàn)。
去重的方案:因為每個消息都有一個MessageId, 保證每個消息都有一個唯一鍵,可以是數(shù)據(jù)庫的主鍵或者唯一約束,也可以是Redis緩存中的鍵,當消費一條消息前,先檢查數(shù)據(jù)庫或緩存中是否存在這個唯一鍵,如果存在就不再處理這條消息,如果消費成功,要保證這個唯一鍵插入到去重表中。
5、RocketMQ是怎么實現(xiàn)分布式事務消息的?
- Producer向broker發(fā)送半消息
- Producer端收到響應,消息發(fā)送成功,此時消息是半消息,標記為“不可投遞”狀態(tài),Consumer消費不了。
- Producer端執(zhí)行本地事務。
- 正常情況本地事務執(zhí)行完成,Producer向Broker發(fā)送Commit/Rollback,如果是Commit,Broker端將半消息標記為正常消息,Consumer可以消費,如果是Rollback,Broker丟棄此消息。
- 異常情況,Broker端遲遲等不到二次確認。在一定時間后,會查詢所有的半消息,然后到Producer端查詢半消息的執(zhí)行情況。
- Producer 端查詢本地事務的狀態(tài)
- 根據(jù)事務的狀態(tài)提交commit/rollback到broker端。
6、從Producer角度分析,如何確保消息成功發(fā)送到了Broker?
采用同步發(fā)送,即發(fā)送一條數(shù)據(jù)等到接受者返回響應之后再發(fā)送下一個數(shù)據(jù)包。如果返回響應OK,表示消息成功發(fā)送到了broker,狀態(tài)超時或者失敗都會觸發(fā)二次重試。MQ Server端會有冪等控制。
可以采用分布式事務消息的投遞方式。
如果一條消息發(fā)送之后超時,也可以通過查詢?nèi)罩镜腁PI,來檢查是否在Broker存儲成功。總的來說,Producer還是采用同步發(fā)送來保證的。
7、從Broker角度分析,如何確保消息持久化?
- 消息只要持久化到CommitLog(日志文件)中,即使Broker宕機,未消費的消息也能重新恢復再消費。
- Broker的刷盤機制:同步刷盤和異步刷盤,不管哪種刷盤都可以保證消息一定存儲在page cache,但是同步刷盤更可靠,它是Producer發(fā)送消息后等數(shù)據(jù)持久化到磁盤之后再返回響應給Producer。
- Broker支持多Master多Slave同步雙寫和多Master多Slave異步復制模式,消息都是發(fā)送給Master主機,但是消費既可以從Master消費,也可以從Slave消費。同步雙寫模式可以保證即使Master宕機,消息肯定在Slave中有備份,保證了消息不會丟失。
8、從Consumer角度分析,如何保證消息被成功消費?
Consumer自身維護了個持久化的offset(對應Message Queue里的min offset),用來標記已經(jīng)成功消費且已經(jīng)成功發(fā)回Broker的消息下標。如果Consumer消費失敗,它會向Broker發(fā)回消費失敗的狀態(tài),發(fā)回成功才會更新自己的offset。如果發(fā)回給broker時broker掛掉了,Consumer會定時重試,如果Consumer和Broker一起掛掉了,消息還在Broker端存儲著,Consumer端的offset也是持久化的,重啟之后繼續(xù)拉取offset之前的消息進行消費。
Spring Boot 集成 Kafka
歡迎關注微信公眾號:互聯(lián)網(wǎng)全棧架構,收取更多有價值的信息。
