Rocketmq源碼分析01:搭建源碼調(diào)試環(huán)境
1. 基本架構(gòu)
RocketMQ架構(gòu)上主要分為四部分,如下圖所示:

Producer:消息發(fā)布的角色,支持分布式集群方式部署。Producer通過MQ的負載均衡模塊選擇相應(yīng)的Broker集群隊列進行消息投遞,投遞的過程支持快速失敗并且低延遲。Consumer:消息消費的角色,支持分布式集群方式部署。支持以push推,pull拉兩種模式對消息進行消費。同時也支持集群方式和廣播方式的消費,它提供實時消息訂閱機制,可以滿足大多數(shù)用戶的需求。NameServer:NameServer是一個非常簡單的Topic路由注冊中心,其角色類似Dubbo中的zookeeper,支持Broker的動態(tài)注冊與發(fā)現(xiàn)。主要包括兩個功能:NameServer通常也是集群的方式部署,各實例間相互不進行信息通訊。Broker是向每一臺NameServer注冊自己的路由信息,所以每一個NameServer實例上面都保存一份完整的路由信息。當(dāng)某個NameServer因某種原因下線了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以動態(tài)感知Broker的路由的信息。Broker管理,NameServer接受Broker集群的注冊信息并且保存下來作為路由信息的基本數(shù)據(jù)。然后提供心跳檢測機制,檢查Broker是否還存活;路由信息管理,每個 NameServer將保存關(guān)于Broker集群的整個路由信息和用于客戶端查詢的隊列信息。然后Producer和Conumser通過NameServer就可以知道整個Broker集群的路由信息,從而進行消息的投遞和消費。BrokerServer:Broker主要負責(zé)消息的存儲、投遞和查詢以及服務(wù)高可用保證,為了實現(xiàn)這些功能,Broker包含了以下幾個重要子模塊:Client Manager:負責(zé)管理客戶端(Producer/Consumer)和維護Consumer的Topic訂閱信息Store Service:提供方便簡單的API接口處理消息存儲到物理硬盤和查詢功能。HA Service:高可用服務(wù),提供Master Broker和Slave Broker之間的數(shù)據(jù)同步功能。Index Service:根據(jù)特定的Message key對投遞到Broker的消息進行索引服務(wù),以提供消息的快速查詢。Remoting Module:整個Broker的實體,負責(zé)處理來自clients端的請求。

2. 獲取源碼
rocketMq項目的github倉庫為https://github.com/apache/rocketmq.git,由于網(wǎng)絡(luò)原因,我們并不會直接使用github倉庫,而是將其導(dǎo)入到gitee上,只需在gitee創(chuàng)建新倉庫時,選擇導(dǎo)入已有倉庫即可:

導(dǎo)入到gitee后,就可以進行checkout了,本文對應(yīng)的gitee倉庫為https://gitee.com/funcy/rocketmq.git。
checkout源碼到本地后,默認是master分支,本人習(xí)慣基于tag創(chuàng)建自己的分支,然后在自己的分支上進行分析,rocketMq的tag如下:

最新版本是4.8.0,我們將基于此tag創(chuàng)建新分支,使用的命令如下:
# 切換到 rocketmq-all-4.8.0
git checkout rocketmq-all-4.8.0
# 基于 rocketmq-all-4.8.0 創(chuàng)建自己的分析,名稱為 rocketmq-all-4.8.0-LEARN
git checkout -b rocketmq-all-4.8.0-LEARN
# 將 rocketmq-all-4.8.0-LEARN 分支推送到遠程倉庫
git push -u origin rocketmq-all-4.8.0-LEARN
接下來,我們所有的操作都是在rocketmq-all-4.8.0-LEARN分支上進行了。
3. 本地啟動
拿到代碼后,我們就開始進行本地啟動了,沒錯,就是在idea中進行啟動。
3.1 復(fù)制conf目錄
在啟動項目前,我們需要進行一些配置,rocketMq項目的配置文件位于rocketmq/distribution模塊下的conf目錄中,直接整個復(fù)制到rocketmq目錄下:

也不需要改動,復(fù)制出來就行了,這些配置的內(nèi)容后面分析源碼時再講解吧。
3.2 啟動nameServer
nameServer的主類為org.apache.rocketmq.namesrv.NamesrvStartup:

如果我們直接運行main()方法,會報錯:

報錯信息已經(jīng)很明確了,需要我們配置ROCKETMQ_HOME目錄,我們在idea中進行配置即可:
打開配置界面:

填寫ROCKETMQ_HOME配置:

這里我填寫的是ROCKETMQ_HOME=/Users/chengyan/IdeaProjects/myproject/rocketmq,這個ROCKETMQ_HOME路徑就是conf文件夾所在的目錄。
填寫好后,就可以啟動了:

3.3 啟動broker
broker的主類為org.apache.rocketmq.broker.BrokerStartup,啟動方式與nameServer很相似,啟動前也要配置ROCKETMQ_HOME路徑:

相比于nameServer,這里多配置了啟動參數(shù):
-n localhost:9876 autoCreateTopicEnable=true
這個啟動參數(shù)是指定nameServer的地址,以及開啟自動創(chuàng)建topic的功能。
配置完成之后就可以啟動了:

3.4 啟動管理后臺
rocketMq的管理后臺在另一個倉庫https://github.com/apache/rocketmq-externals,除了后臺,這個倉庫還包含了許多的其他模塊:

我們并不需要分析這個項目,源碼本可以不必下載,但我在找這個項目的release版本時,發(fā)現(xiàn)并沒有提供已編譯好的jar包,需要自己構(gòu)建代碼,因此我就再次下載了這個代碼源碼。當(dāng)然,由于網(wǎng)絡(luò)的原因,這個項目的源碼也被我導(dǎo)入到了gitee上,地址為https://gitee.com/funcy/rocketmq-externals.git.
這個項目的代碼我們并不分析,因此直接在master分支上操作即可,
管理后臺項目為rocketmq-console,主類為org.apache.rocketmq.console.App:

在啟動前,我們需要修改下application.properties的配置,找到rocketmq.config.namesrvAddr配置,添加nameServer的ip與端口,這里我們連接的是本地應(yīng)用,直接填寫localhost:9876:
...
rocketmq.config.namesrvAddr=localhost:9876
...
啟動,結(jié)果如下:

訪問http://localhost:8080,結(jié)果如下:

可以看到broker已經(jīng)出現(xiàn)在cluster列表中了,這就表明啟動成功了。
4. 收發(fā)消息測試
rocketMq項目的example模塊下有大量的測試示例,我們選擇其一進行消息收發(fā)測試。
4.1 啟動Consumer
我們先找到org.apache.rocketmq.example.simple.PushConsumer,代碼如下:
public class PushConsumer {
public static void main(String[] args)
throws InterruptedException, MQClientException {
String nameServer = "localhost:9876";
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.setNamesrvAddr(nameServer);
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//wrong time format 2017_0422_221800
consumer.setConsumeTimestamp("20181109221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n",
Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
這個Consumer監(jiān)聽的topic是TopicTest,后面我們就會往這個topic發(fā)送消息。另外,需要注意nameServer的配置,我們是在本地啟動的nameServer,因此這里配置的是localhost:9876。
運行main()方法,結(jié)果如下:

4.2 啟動Producer
我們找到 org.apache.rocketmq.example.simple.Producer 類,代碼如下:
public class Producer {
public static void main(String[] args)
throws MQClientException, InterruptedException {
String nameServer = "localhost:9876";
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr(nameServer);
producer.start();
for (int i = 0; i < 10; i++)
try {
{
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
同樣地,這里使用的是的nameServer地址是localhost:9876,topic 是TopicTest,運行,結(jié)果如下:

再回過頭看看PushConsumer的控制臺:

可以看到,Producer發(fā)送消息成功了,PushConsumer也成功獲取到消息了。
4.3 異常分析
如圖所示:

如果出現(xiàn)異常:
org.apache.rocketmq.client.exception.MQClientException:
No route info of this topic: TopicTest
這表明當(dāng)前broker中沒有TopicTest的topic,這時我們可以手動創(chuàng)建topic,也可以在啟動時指定autoCreateTopicEnable=true.
如果是按上面步驟進行的,請確認下org.apache.rocketmq.broker.BrokerStartup是否配置啟動參數(shù)
-n localhost:9876 autoCreateTopicEnable=true
配置方式就按3.3節(jié)的方式配置就行了。
5. 總結(jié)
本文主要介紹了rocketMq的基本架構(gòu),通過源碼展示了rocketMq的啟動方式,最后通過rocketMq項目下example模塊中的測試代碼展示了消息的收發(fā)過程。
總的來說,本文還是在準備源碼分析的環(huán)境,下篇文章開始,我們就正式開始rocketMq的源碼分析了。
限于作者個人水平,文中難免有錯誤之處,歡迎指正!原創(chuàng)不易,商業(yè)轉(zhuǎn)載請聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請注明出處。
本文首發(fā)于微信公眾號 Java技術(shù)探秘,如果您喜歡本文,歡迎關(guān)注該公眾號,讓我們一起在技術(shù)的世界里探秘吧!
