1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        Rocketmq源碼分析12:consumer 負(fù)載均衡

        共 25136字,需瀏覽 51分鐘

         ·

        2021-04-28 00:29

        注:本系列源碼分析基于RocketMq 4.8.0,gitee倉(cāng)庫(kù)鏈接:https://gitee.com/funcy/rocketmq.git.

        接上文,繼續(xù)分析consumer消費(fèi)流程。

        5. 如何選擇消息隊(duì)列:RebalanceService

        讓我們回到PullMessageService#run()方法:

        public class PullMessageService extends ServiceThread {

            ...

            private final LinkedBlockingQueue<PullRequest> pullRequestQueue 
                = new LinkedBlockingQueue<PullRequest>();

            /**
             * 將 pullRequest 放入 pullRequestQueue 中
             */

            public void executePullRequestImmediately(final PullRequest pullRequest) {
                try {
                    this.pullRequestQueue.put(pullRequest);
                } catch (InterruptedException e) {
                    log.error("executePullRequestImmediately pullRequestQueue.put", e);
                }
            }

            @Override
            public void run() {
                log.info(this.getServiceName() + " service started");

                while (!this.isStopped()) {
                    try {
                        // 從 pullRequestQueue 獲取一個(gè) pullRequest,阻塞的方式
                        PullRequest pullRequest = this.pullRequestQueue.take();
                        this.pullMessage(pullRequest);
                    } catch (InterruptedException ignored) {
                    } catch (Exception e) {
                        log.error("Pull Message Service Run Method exception", e);
                    }
                }

                log.info(this.getServiceName() + " service end");
            }

            ...
        }

        PullMessageService線程獲得了pullRequest后,然后就開始了一次又一次的拉起消息的操作,那這個(gè)pullRequest最初是在哪里添加進(jìn)來(lái)的呢?這就是本節(jié)要分析的「負(fù)載均衡」功能了。

        處理負(fù)載均衡的線程為RebalanceService,它是在MQClientInstance#start方法中啟動(dòng)的,我們直接進(jìn)入其run()方法:

        public class RebalanceService extends ServiceThread {

            // 省略其他
            ...

            @Override
            public void run() {
                log.info(this.getServiceName() + " service started");

                while (!this.isStopped()) {
                    this.waitForRunning(waitInterval);
                    this.mqClientFactory.doRebalance();
                }

                log.info(this.getServiceName() + " service end");
            }
        }

        在它的run()方法中,僅是調(diào)用了MQClientInstance#doRebalance方法,我們繼續(xù)進(jìn)入:

        public void doRebalance() {
            // consumerTable 存放的就是當(dāng)前 consumer
            for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
                MQConsumerInner impl = entry.getValue();
                if (impl != null) {
                    try {
                        impl.doRebalance();
                    } catch (Throwable e) {
                        log.error("doRebalance exception", e);
                    }
                }
            }
        }

        MQClientInstance#doRebalance方法中,會(huì)遍歷所有的consumer,然后調(diào)用DefaultMQPushConsumerImpl#doRebalance方法作進(jìn)一步的處理,consumerTable就是用來(lái)保存DefaultMQPushConsumerImpl實(shí)例的,繼續(xù)進(jìn)入DefaultMQPushConsumerImpl#doRebalance方法:

        @Override
        public void doRebalance() {
            if (!this.pause) {
                this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
            }
        }

        繼續(xù)跟進(jìn),來(lái)到RebalanceImpl#doRebalance方法:

        public void doRebalance(final boolean isOrder) {
            Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
            if (subTable != null) {
                for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
                    final String topic = entry.getKey();
                    try {
                        // 客戶端負(fù)載均衡:根據(jù)主題來(lái)處理負(fù)載均衡
                        this.rebalanceByTopic(topic, isOrder);
                    } catch (Throwable e) {
                        if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                            log.warn("rebalanceByTopic Exception", e);
                        }
                    }
                }
            }

            this.truncateMessageQueueNotMyTopic();
        }

        /**
         * 這就是最張?zhí)幚碡?fù)載均衡的地方了
         */

        private void rebalanceByTopic(final String topic, final boolean isOrder) {
            switch (messageModel) {
                // 廣播模式:不需要處理負(fù)載均衡,每個(gè)消費(fèi)者都要消費(fèi),只需要更新負(fù)載信息
                case BROADCASTING: {
                    Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                    if (mqSet != null) {
                        // 更新負(fù)載均衡信息,這里傳入的參數(shù)是mqSet,即所有隊(duì)列
                        boolean changed = this
                            .updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
                        if (changed) {
                            this.messageQueueChanged(topic, mqSet, mqSet);
                            log.info(...);
                        }
                    } else {
                        log.warn(...);
                    }
                    break;
                }
                // 集群模式
                case CLUSTERING: {
                    // 根據(jù)訂閱的主題獲取消息隊(duì)列
                    Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                    // 客戶端id,根據(jù) topic 與 consumerGroup 獲取所有的 consumerId
                    List<String> cidAll = this.mQClientFactory
                        .findConsumerIdList(topic, consumerGroup);
                    if (null == mqSet) {
                        if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                            log.warn(...);
                        }
                    }

                    if (null == cidAll) {
                        log.warn(...);
                    }

                    if (mqSet != null && cidAll != null) {
                        List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                        mqAll.addAll(mqSet);
                        // 排序后才能保證消費(fèi)者負(fù)載策略相對(duì)穩(wěn)定
                        Collections.sort(mqAll);
                        Collections.sort(cidAll);
                        // MessageQueue 的負(fù)載策略
                        AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                        List<MessageQueue> allocateResult = null;
                        try {
                            // 按負(fù)載策略進(jìn)行分配,返回當(dāng)前消費(fèi)者實(shí)際訂閱的messageQueue集合
                            allocateResult = strategy.allocate(
                                this.consumerGroup,
                                this.mQClientFactory.getClientId(),
                                mqAll,
                                cidAll);
                        } catch (Throwable e) {
                            log.error(...);
                            return;
                        }

                        Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                        if (allocateResult != null) {
                            allocateResultSet.addAll(allocateResult);
                        }

                        // 更新負(fù)載均衡信息,傳入?yún)?shù)是 allocateResultSet,即當(dāng)前consumer分配到的隊(duì)列
                        boolean changed = this
                            .updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                        if (changed) {
                            log.info(...);
                            this.messageQueueChanged(topic, mqSet, allocateResultSet);
                        }
                    }
                    break;
                }
                default:
                    break;
            }
        }

        RebalanceImpl#rebalanceByTopic方法就是最終處理負(fù)載均衡的方法了,在這個(gè)方法里會(huì)區(qū)分廣播模式與集群模式的處理。

        在廣播模式下,一條消息會(huì)被同一個(gè)消費(fèi)組中的所有consumer消費(fèi),而集群模式下,一條消息只會(huì)被同一個(gè)消費(fèi)組下的一個(gè)consumer消費(fèi)。

        正是因?yàn)槿绱耍瑥V播模式下并沒有負(fù)載均衡可言,直接把所有的隊(duì)列都分配給當(dāng)前consumer處理,然后更新QueueTable的負(fù)載均衡信息;而集群模式會(huì)先分配當(dāng)前consumer消費(fèi)的消息隊(duì)列,再更新QueueTable的負(fù)載均衡信息。

        這里我們來(lái)看看集群模式,看看它的操作:

        1. strategy.allocate(...):按負(fù)載均衡策略為當(dāng)前consumer分配隊(duì)列
        2. updateProcessQueueTableInRebalance(...):更新負(fù)載均衡信息。

        rocketMq中,提供了這些負(fù)載均衡策略:

        • AllocateMessageQueueAveragely:平均負(fù)載策略,rocketMq默認(rèn)使用的策略
        • AllocateMessageQueueAveragelyByCircle:環(huán)形平均分配,這個(gè)和平均分配唯一的區(qū)別就是,再分隊(duì)列的時(shí)候,平均隊(duì)列是將屬于自己的MessageQueue全部拿走,而環(huán)形平均則是,一人拿一個(gè),拿到的Queue不是連續(xù)的。
        • AllocateMessageQueueByConfig:用戶自定義配置
        • AllocateMessageQueueByMachineRoom:同機(jī)房負(fù)載策略,這個(gè)策略就是當(dāng)前Consumer只負(fù)載處在指定的機(jī)房?jī)?nèi)的MessageQueue,brokerName的命名必須要按要求的格式來(lái)設(shè)置:機(jī)房名@brokerName
        • AllocateMachineRoomNearby:就近機(jī)房負(fù)載策略,在AllocateMessageQueueByMachineRoom策略中,如果同一機(jī)房中只有MessageQueue而沒有consumer,那這個(gè)MessageQueue上的消息該如何消費(fèi)呢?AllocateMachineRoomNearby就是擴(kuò)充了該功能的處理
        • AllocateMessageQueueConsistentHash:一致性哈希策略

        這里我們重點(diǎn)來(lái)分析平均負(fù)載策略AllocateMessageQueueAveragely

        public List<MessageQueue> allocate(String consumerGroup, String currentCID, 
                List<MessageQueue> mqAll, List<String> cidAll)
         
        {
            // 返回值        
            List<MessageQueue> result = new ArrayList<MessageQueue>();

            // 省略一些判斷操作
            ...


            int index = cidAll.indexOf(currentCID);
            int mod = mqAll.size() % cidAll.size();
            // 1. 消費(fèi)者數(shù)量大于隊(duì)列數(shù)量:averageSize = 1
            // 2. 消費(fèi)者數(shù)量小于等于隊(duì)列數(shù)量:averageSize = 隊(duì)列數(shù)量 / 消費(fèi)者數(shù)量,還要處理個(gè)+1的操作
            int averageSize = mqAll.size() <= cidAll.size() 
                ? 1 : (mod > 0 && index < mod 
                    ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size());
            int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
            int range = Math.min(averageSize, mqAll.size() - startIndex);
            for (int i = 0; i < range; i++) {
                result.add(mqAll.get((startIndex + i) % mqAll.size()));
            }
            return result;
        }

        這個(gè)方法中,關(guān)鍵的分配方法就在后面幾行,如果只看代碼,會(huì)感覺有點(diǎn)暈,這里我舉一個(gè)例子來(lái)簡(jiǎn)單解釋下:

        假設(shè):messageQueue一共有6個(gè),consumer有4個(gè),當(dāng)前consumerindex為1,有了這些前提后,接下來(lái)我們就來(lái)看它的分配過程了。

        1. 計(jì)算取余操作:6 % 4 = 2,這表明messageQueue不能平均分配給每個(gè)consumer,接下來(lái)就來(lái)看看這個(gè)余數(shù)2是如何處理的

        2. 計(jì)算每個(gè)consumer平均處理的messageQueue數(shù)量

          消費(fèi)者索引0123
          處理數(shù)量2211
          • 這里需要注意,如果consumer數(shù)量大于messageQueue數(shù)量,那每個(gè)consumer最多只會(huì)分配到一個(gè)messageQueue,這種情況下,余數(shù)2不會(huì)進(jìn)行處理,并且有的consumer處理的messageQueue數(shù)量為0,同一個(gè)messageQueue不會(huì)同時(shí)被兩個(gè)及以上的consumer消費(fèi)掉
          • 這里的messageQueue數(shù)量為6,consumer為4,計(jì)算得到每個(gè)consumer處理的隊(duì)列數(shù)最少為1,除此之外,為了實(shí)現(xiàn)“平均”,有2個(gè)consumer會(huì)需要多處理1個(gè)messageQueue,按“平均”的分配原則,如果index小于mod,則會(huì)分配多1個(gè)messageQueue,這里的mod為2,結(jié)果如下:
        3. 分配完每個(gè)consumer處理的messageQueue數(shù)量后,這些messageQueue該如何分配呢?從代碼來(lái)看,分配時(shí)會(huì)先分配完一個(gè)consumer,再分配下一個(gè)consumer,最終結(jié)果就是這樣:

          隊(duì)列Q0Q1Q2Q3Q4Q5
          消費(fèi)者C1C1C2C2C4C5

        從圖中可以看到,在6個(gè)messageQueue、4個(gè)consumer、當(dāng)前consumerindex為1的情況下,當(dāng)前consumer會(huì)分到2個(gè)隊(duì)列,分別為Q2/Q3.

        messageQueue分配完成后,接下來(lái)就是更新負(fù)載信息了,方法為RebalanceImpl#updateProcessQueueTableInRebalance

        private boolean updateProcessQueueTableInRebalance(final String topic, 
                final Set<MessageQueue> mqSet, final boolean isOrder)
         
        {

            ...

            List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
            for (MessageQueue mq : mqSet) {
                if (!this.processQueueTable.containsKey(mq)) {
                    if (isOrder && !this.lock(mq)) {
                        log.warn(...);
                        continue;
                    }

                    this.removeDirtyOffset(mq);
                    ProcessQueue pq = new ProcessQueue();
                    long nextOffset = this.computePullFromWhere(mq);
                    if (nextOffset >= 0) {
                        ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                        if (pre != null) {
                            log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
                        } 
                        // pullRequest 最初產(chǎn)生的地方:mq 不存在,就添加
                        else {
                            log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                            // 添加 pullRequest
                            PullRequest pullRequest = new PullRequest();
                            pullRequest.setConsumerGroup(consumerGroup);
                            pullRequest.setNextOffset(nextOffset);
                            pullRequest.setMessageQueue(mq);
                            pullRequest.setProcessQueue(pq);
                            pullRequestList.add(pullRequest);
                            changed = true;
                        }
                    } else {
                        log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
                    }
                }
            }

            // 發(fā)布
            this.dispatchPullRequest(pullRequestList);

            return changed;
        }

        這個(gè)方法中最最關(guān)鍵的就是pullRequestList的添加操作了:先遍歷傳入的MessageQueue,如果當(dāng)前consumer沒有消費(fèi)過該messageQueue,則添加一個(gè)新的pullRequestpullRequestList,之后就是發(fā)布pullRequestList了。

        看到這里,我們就應(yīng)該能明白,最初的pullRequest就是在這里產(chǎn)生的,而發(fā)布pullRequestList的操作,就是將pullRequest丟給pullMessageService線程處理了:

        /**
         * RebalancePushImpl#dispatchPullRequest:發(fā)布pullRequest的操作
         */

        public void dispatchPullRequest(List<PullRequest> pullRequestList) {
            for (PullRequest pullRequest : pullRequestList) {
                // 在這里執(zhí)行pullRequest,其實(shí)就是把 pullRequest 添加到
                // PullMessageService#pullRequestQueue 中
                this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
                log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
            }
        }

        限于篇幅,本文就先到這里了,下篇繼續(xù)。


        限于作者個(gè)人水平,文中難免有錯(cuò)誤之處,歡迎指正!原創(chuàng)不易,商業(yè)轉(zhuǎn)載請(qǐng)聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請(qǐng)注明出處。

        本文首發(fā)于微信公眾號(hào) 「Java技術(shù)探秘」,如果您喜歡本文,歡迎關(guān)注該公眾號(hào),讓我們一起在技術(shù)的世界里探秘吧!


        瀏覽 17
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            快播污视频 | 成人免费视频播放成人无码免费视频播放 | 激情五月天黄色电影 | 国产偷人精品高潮露脸 | 美女下面喷水视频 | 成人菠萝视频 | 91爱在线 | 嫩草影院入口 | 深夜免费福利视频 | 国产一卡二卡在线播放 |