1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        Rocketmq源碼分析01:搭建源碼調(diào)試環(huán)境

        共 9010字,需瀏覽 19分鐘

         ·

        2021-04-02 11:25

        1. 基本架構(gòu)

        RocketMQ架構(gòu)上主要分為四部分,如下圖所示:

        • Producer:消息發(fā)布的角色,支持分布式集群方式部署。Producer通過MQ的負載均衡模塊選擇相應(yīng)的Broker集群隊列進行消息投遞,投遞的過程支持快速失敗并且低延遲。

        • Consumer:消息消費的角色,支持分布式集群方式部署。支持以push推,pull拉兩種模式對消息進行消費。同時也支持集群方式和廣播方式的消費,它提供實時消息訂閱機制,可以滿足大多數(shù)用戶的需求。

        • NameServerNameServer是一個非常簡單的Topic路由注冊中心,其角色類似Dubbo中的zookeeper,支持Broker的動態(tài)注冊與發(fā)現(xiàn)。主要包括兩個功能:

          NameServer通常也是集群的方式部署,各實例間相互不進行信息通訊。Broker是向每一臺NameServer注冊自己的路由信息,所以每一個NameServer實例上面都保存一份完整的路由信息。當(dāng)某個NameServer因某種原因下線了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以動態(tài)感知Broker的路由的信息。

          • Broker管理,NameServer接受Broker集群的注冊信息并且保存下來作為路由信息的基本數(shù)據(jù)。然后提供心跳檢測機制,檢查Broker是否還存活;
          • 路由信息管理,每個NameServer將保存關(guān)于Broker集群的整個路由信息和用于客戶端查詢的隊列信息。然后ProducerConumser通過NameServer就可以知道整個Broker集群的路由信息,從而進行消息的投遞和消費。
        • BrokerServerBroker主要負責(zé)消息的存儲、投遞和查詢以及服務(wù)高可用保證,為了實現(xiàn)這些功能,Broker包含了以下幾個重要子模塊:

          • Client Manager:負責(zé)管理客戶端(Producer/Consumer)和維護ConsumerTopic訂閱信息
          • Store Service:提供方便簡單的API接口處理消息存儲到物理硬盤和查詢功能。
          • HA Service:高可用服務(wù),提供Master BrokerSlave Broker之間的數(shù)據(jù)同步功能。
          • Index Service:根據(jù)特定的Message key對投遞到Broker的消息進行索引服務(wù),以提供消息的快速查詢。
          • Remoting Module:整個Broker的實體,負責(zé)處理來自clients端的請求。

        2. 獲取源碼

        rocketMq項目的github倉庫為https://github.com/apache/rocketmq.git,由于網(wǎng)絡(luò)原因,我們并不會直接使用github倉庫,而是將其導(dǎo)入到gitee上,只需在gitee創(chuàng)建新倉庫時,選擇導(dǎo)入已有倉庫即可:

        導(dǎo)入到gitee后,就可以進行checkout了,本文對應(yīng)的gitee倉庫為https://gitee.com/funcy/rocketmq.git。

        checkout源碼到本地后,默認是master分支,本人習(xí)慣基于tag創(chuàng)建自己的分支,然后在自己的分支上進行分析,rocketMqtag如下:

        最新版本是4.8.0,我們將基于此tag創(chuàng)建新分支,使用的命令如下:

        # 切換到 rocketmq-all-4.8.0
        git checkout rocketmq-all-4.8.0
        # 基于 rocketmq-all-4.8.0 創(chuàng)建自己的分析,名稱為 rocketmq-all-4.8.0-LEARN
        git checkout -b rocketmq-all-4.8.0-LEARN
        # 將 rocketmq-all-4.8.0-LEARN 分支推送到遠程倉庫
        git push -u origin rocketmq-all-4.8.0-LEARN

        接下來,我們所有的操作都是在rocketmq-all-4.8.0-LEARN分支上進行了。

        3. 本地啟動

        拿到代碼后,我們就開始進行本地啟動了,沒錯,就是在idea中進行啟動。

        3.1 復(fù)制conf目錄

        在啟動項目前,我們需要進行一些配置,rocketMq項目的配置文件位于rocketmq/distribution模塊下的conf目錄中,直接整個復(fù)制到rocketmq目錄下:

        也不需要改動,復(fù)制出來就行了,這些配置的內(nèi)容后面分析源碼時再講解吧。

        3.2 啟動nameServer

        nameServer的主類為org.apache.rocketmq.namesrv.NamesrvStartup

        如果我們直接運行main()方法,會報錯:

        報錯信息已經(jīng)很明確了,需要我們配置ROCKETMQ_HOME目錄,我們在idea中進行配置即可:

        打開配置界面:

        填寫ROCKETMQ_HOME配置:

        這里我填寫的是ROCKETMQ_HOME=/Users/chengyan/IdeaProjects/myproject/rocketmq,這個ROCKETMQ_HOME路徑就是conf文件夾所在的目錄。

        填寫好后,就可以啟動了:

        3.3 啟動broker

        broker的主類為org.apache.rocketmq.broker.BrokerStartup,啟動方式與nameServer很相似,啟動前也要配置ROCKETMQ_HOME路徑:

        相比于nameServer,這里多配置了啟動參數(shù):

        -n localhost:9876 autoCreateTopicEnable=true

        這個啟動參數(shù)是指定nameServer的地址,以及開啟自動創(chuàng)建topic的功能。

        配置完成之后就可以啟動了:

        3.4 啟動管理后臺

        rocketMq的管理后臺在另一個倉庫https://github.com/apache/rocketmq-externals,除了后臺,這個倉庫還包含了許多的其他模塊:

        我們并不需要分析這個項目,源碼本可以不必下載,但我在找這個項目的release版本時,發(fā)現(xiàn)并沒有提供已編譯好的jar包,需要自己構(gòu)建代碼,因此我就再次下載了這個代碼源碼。當(dāng)然,由于網(wǎng)絡(luò)的原因,這個項目的源碼也被我導(dǎo)入到了gitee上,地址為https://gitee.com/funcy/rocketmq-externals.git.

        這個項目的代碼我們并不分析,因此直接在master分支上操作即可,

        管理后臺項目為rocketmq-console,主類為org.apache.rocketmq.console.App

        在啟動前,我們需要修改下application.properties的配置,找到rocketmq.config.namesrvAddr配置,添加nameServer的ip與端口,這里我們連接的是本地應(yīng)用,直接填寫localhost:9876

        ...
        rocketmq.config.namesrvAddr=localhost:9876
        ...

        啟動,結(jié)果如下:

        訪問http://localhost:8080,結(jié)果如下:

        可以看到broker已經(jīng)出現(xiàn)在cluster列表中了,這就表明啟動成功了。

        4. 收發(fā)消息測試

        rocketMq項目的example模塊下有大量的測試示例,我們選擇其一進行消息收發(fā)測試。

        4.1 啟動Consumer

        我們先找到org.apache.rocketmq.example.simple.PushConsumer,代碼如下:

        public class PushConsumer {

            public static void main(String[] args) 
                    throws InterruptedException, MQClientException 
        {
                String nameServer = "localhost:9876";
                DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
                consumer.setNamesrvAddr(nameServer);
                consumer.subscribe("TopicTest""*");
                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                //wrong time format 2017_0422_221800
                consumer.setConsumeTimestamp("20181109221800");
                consumer.registerMessageListener(new MessageListenerConcurrently() {

                    @Override
                    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, 
                            ConsumeConcurrentlyContext context)
         
        {
                        System.out.printf("%s Receive New Messages: %s %n"
                            Thread.currentThread().getName(), msgs);
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                });
                consumer.start();
                System.out.printf("Consumer Started.%n");
            }
        }

        這個Consumer監(jiān)聽的topicTopicTest,后面我們就會往這個topic發(fā)送消息。另外,需要注意nameServer的配置,我們是在本地啟動的nameServer,因此這里配置的是localhost:9876。

        運行main()方法,結(jié)果如下:

        4.2 啟動Producer

        我們找到 org.apache.rocketmq.example.simple.Producer 類,代碼如下:

        public class Producer {

            public static void main(String[] args) 
                    throws MQClientException, InterruptedException 
        {
                String nameServer = "localhost:9876";
                DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
                producer.setNamesrvAddr(nameServer);
                producer.start();

                for (int i = 0; i < 10; i++)
                    try {
                        {
                            Message msg = new Message("TopicTest",
                                "TagA",
                                "OrderID188",
                                "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                            SendResult sendResult = producer.send(msg);
                            System.out.printf("%s%n", sendResult);
                        }

                    } catch (Exception e) {
                        e.printStackTrace();
                    }

                producer.shutdown();
            }
        }

        同樣地,這里使用的是的nameServer地址是localhost:9876topicTopicTest,運行,結(jié)果如下:

        再回過頭看看PushConsumer的控制臺:

        可以看到,Producer發(fā)送消息成功了,PushConsumer也成功獲取到消息了。

        4.3 異常分析

        如圖所示:

        如果出現(xiàn)異常:

        org.apache.rocketmq.client.exception.MQClientException: 
        No route info of this topic: TopicTest

        這表明當(dāng)前broker中沒有TopicTesttopic,這時我們可以手動創(chuàng)建topic,也可以在啟動時指定autoCreateTopicEnable=true.

        如果是按上面步驟進行的,請確認下org.apache.rocketmq.broker.BrokerStartup是否配置啟動參數(shù)

        -n localhost:9876 autoCreateTopicEnable=true

        配置方式就按3.3節(jié)的方式配置就行了。

        5. 總結(jié)

        本文主要介紹了rocketMq的基本架構(gòu),通過源碼展示了rocketMq的啟動方式,最后通過rocketMq項目下example模塊中的測試代碼展示了消息的收發(fā)過程。

        總的來說,本文還是在準備源碼分析的環(huán)境,下篇文章開始,我們就正式開始rocketMq的源碼分析了。


        限于作者個人水平,文中難免有錯誤之處,歡迎指正!原創(chuàng)不易,商業(yè)轉(zhuǎn)載請聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請注明出處。

        本文首發(fā)于微信公眾號 Java技術(shù)探秘,如果您喜歡本文,歡迎關(guān)注該公眾號,讓我們一起在技術(shù)的世界里探秘吧!


        瀏覽 51
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            青青草成人网站 | AV乱伦网站 | 仙踪林一级婬片A片 | 亚洲免费视频在线播放 | DHDHDH18-19XXXX | 9丨PORNY九色9l祝频 | 伊人九九九 | 日日躁久久躁熟妇高潮喷 | 韩国三级久久 | 黄片一区|