1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        Rocketmq源碼分析15:延遲消息

        共 35925字,需瀏覽 72分鐘

         ·

        2021-05-02 15:10

        注:本系列源碼分析基于RocketMq 4.8.0,gitee倉庫鏈接:https://gitee.com/funcy/rocketmq.git.

        rocketmq支持延遲消息,本文我們將從源碼角度分析延遲消息的實(shí)現(xiàn)原理。

        1. demo 準(zhǔn)備

        延遲消息的demo在org.apache.rocketmq.example.delay包下,發(fā)送消息的producer如下:

        public class Producer {

            public static void main(String[] args) throws MQClientException, InterruptedException {
                String nameServer = "localhost:9876";
                DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
                producer.setNamesrvAddr(nameServer);
                producer.start();

                for (int i = 0; i < 1; i++)
                    try {
                        {
                            Message msg = new Message("TopicTest",
                                "TagA",
                                "OrderID188",
                                "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                            // delayLevel=1  2  3   4   5  6  7  8  9  10 11 12 13 14  15  16  17 18
                            // delayTime =1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
                            // 設(shè)置延遲延遲級(jí)別
                            msg.setDelayTimeLevel(5);
                            SendResult sendResult = producer.send(msg);
                            System.out.printf("%s%n", sendResult);
                        }

                    } catch (Exception e) {
                        e.printStackTrace();
                    }

                producer.shutdown();
            }
        }

        rocketmq在實(shí)現(xiàn)延遲消息時(shí),會(huì)準(zhǔn)備18個(gè)延遲級(jí)別,這些級(jí)別對(duì)應(yīng)的延遲時(shí)間如下:

        123456789101112131415161718
        1s5s10s30s1m2m3m4m5m6m7m8m9m10m20m30m1h2h

        在發(fā)送延遲消息時(shí),需要指定消息的延遲級(jí)別:

        msg.setDelayTimeLevel(5);

        這里指定的延遲級(jí)別為5,即延遲1分鐘后發(fā)送。

        2. 延遲消息的存儲(chǔ)

        延遲消息與普通消息的發(fā)送并無太多差別,不過在broker在存儲(chǔ)延遲消息時(shí),會(huì)做一些額外的處理,進(jìn)入CommitLog#asyncPutMessage方法:

         public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
            // 消息的存儲(chǔ)時(shí)間
            msg.setStoreTimestamp(System.currentTimeMillis());
            msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
            AppendMessageResult result = null;

            StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

            String topic = msg.getTopic();
            int queueId = msg.getQueueId();

            final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
            if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
                    || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
                // 延遲消息
                if (msg.getDelayTimeLevel() > 0) {
                    if (msg.getDelayTimeLevel() > this.defaultMessageStore
                            .getScheduleMessageService().getMaxDelayLevel()) {
                        msg.setDelayTimeLevel(
                            this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                    }
                    // 指定延遲消息對(duì)應(yīng)的topic
                    topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
                    // 延遲級(jí)別對(duì)應(yīng)的隊(duì)列,即每個(gè)延遲級(jí)別都對(duì)應(yīng)一條隊(duì)列
                    queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

                    // 原始的topic與queueId
                    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, 
                        String.valueOf(msg.getQueueId()));
                    msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

                    msg.setTopic(topic);
                    msg.setQueueId(queueId);
                }
            }

            // 省略消息寫入 commitLog 的操作
            ...
        }

        在延遲消息寫入前,會(huì)做一些特別處理,其實(shí)就是將消息的topicqueueId修改為延遲消息專用的topicqueueId。

        獲取延遲隊(duì)列的方法為ScheduleMessageService#delayLevel2QueueId,代碼如下:

        public static int delayLevel2QueueId(final int delayLevel) {
            return delayLevel - 1;
        }

        這里的delayLevel,就對(duì)應(yīng)前面提到的18個(gè)延遲級(jí)別,這也就是說,每個(gè)延遲級(jí)別的消息都會(huì)有一個(gè)專門隊(duì)列來存儲(chǔ)。這樣存儲(chǔ)有何好處呢?最大的好處就是避免了排序,舉個(gè)簡單的例子:上午10:00broker收到了一條延遲消息1,延遲級(jí)別為5;然后在10:02又收到了一條延遲消息2,延遲級(jí)別也為5,由于延遲級(jí)別相同,他們會(huì)存儲(chǔ)在同一條隊(duì)列中.

        由于隊(duì)列天生有序,入隊(duì)時(shí)間先按送達(dá)broker的時(shí)間先后進(jìn)行排序,而同一隊(duì)列上延遲時(shí)間也相同,因此延遲消息1一定會(huì)在延遲消息2前進(jìn)行消消費(fèi),后面如果有消息再進(jìn)入該隊(duì)列中,也會(huì)按照先進(jìn)先出的方式進(jìn)行消費(fèi)。

        3. 延遲消息的投遞

        上一節(jié)分析了延遲消息的存儲(chǔ),本節(jié)我們來分析延遲消息的消費(fèi)。

        延遲消息存儲(chǔ)到隊(duì)列后,會(huì)有一個(gè)專門的線程定期掃描這些隊(duì)列,找到滿足消費(fèi)時(shí)間的消息,然后將其投遞到真正的topicqueueId中,這樣這條消息就能被consumer消息了。

        處理延遲隊(duì)列掃描的線程為scheduleMessageService,它在DefaultMessageStore#start方法中啟動(dòng):

        public void start() throws Exception {
            ...
            if (!messageStoreConfig.isEnableDLegerCommitLog()) {
                this.haService.start();
                // 這里處理延遲消息
                this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
            }
            ...
        }

        繼續(xù)跟進(jìn),進(jìn)入DefaultMessageStore#handleScheduleMessageService 方法:

        @Override
        public void handleScheduleMessageService(final BrokerRole brokerRole) {
            if (this.scheduleMessageService != null) {
                if (brokerRole == BrokerRole.SLAVE) {
                    this.scheduleMessageService.shutdown();
                } else {
                    // 啟動(dòng)
                    this.scheduleMessageService.start();
                }
            }

        }

        繼續(xù)跟進(jìn),進(jìn)入ScheduleMessageService#start方法:

        /**
         * 延遲消息服務(wù)的啟動(dòng)方式
         */

        public void start() {
            // CAS 鎖機(jī)制保證必須 shutdown 后才能再次start
            if (started.compareAndSet(falsetrue)) {
                this.timer = new Timer("ScheduleMessageTimerThread"true);
                for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
                    Integer level = entry.getKey();
                    Long timeDelay = entry.getValue();
                    Long offset = this.offsetTable.get(level);
                    if (null == offset) {
                        offset = 0L;
                    }

                    if (timeDelay != null) {
                        // 定時(shí)執(zhí)行延遲消息處理任務(wù)
                        this.timer.schedule(
                            new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
                    }
                }
                // 每隔10s,將延遲消息的相關(guān)信息持久化到硬盤中
                this.timer.scheduleAtFixedRate(new TimerTask() {

                    @Override
                    public void run() {
                        try {
                            if (started.get()) ScheduleMessageService.this.persist();
                        } catch (Throwable e) {
                            log.error("scheduleAtFixedRate flush exception", e);
                        }
                    }
                }, 10000this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
            }
        }

        在這個(gè)線程中,主要做了兩件事:

        1. 遍歷所有的延遲級(jí)別,為每個(gè)延遲級(jí)別在延遲FIRST_DELAY_TIME毫秒后就處理延遲消息的投遞操作
        2. 開啟執(zhí)久化定時(shí)任務(wù):定時(shí)將延遲消息的相關(guān)信息持久化到硬盤中

        3.1 投遞操作

        處理延遲消息的投遞任務(wù)為DeliverDelayedMessageTimerTask#run方法,代碼如下:

        public void run() {
            try {
                if (isStarted()) {
                    this.executeOnTimeup();
                }
            } catch (Exception e) {
                // XXX: warn and notify me
                log.error("ScheduleMessageService, executeOnTimeup exception", e);
                ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                    this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
            }
        }

        在這個(gè)方法中,調(diào)用了executeOnTimeup()方法繼續(xù)操作,我們?cè)龠M(jìn)入ScheduleMessageService.DeliverDelayedMessageTimerTask#executeOnTimeup方法:

        public void executeOnTimeup() {
            // 獲得一條隊(duì)列
            ConsumeQueue cq =
                ScheduleMessageService.this.defaultMessageStore
                    .findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                        delayLevel2QueueId(delayLevel));

            long failScheduleOffset = offset;

            if (cq != null) {
                SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
                if (bufferCQ != null) {
                    try {
                        long nextOffset = offset;
                        int i = 0;
                        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                        for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                            long offsetPy = bufferCQ.getByteBuffer().getLong();
                            int sizePy = bufferCQ.getByteBuffer().getInt();
                            long tagsCode = bufferCQ.getByteBuffer().getLong();

                            if (cq.isExtAddr(tagsCode)) {
                                if (cq.getExt(tagsCode, cqExtUnit)) {
                                    tagsCode = cqExtUnit.getTagsCode();
                                } else {
                                    log.error(...);
                                    // 1. 消息的寫入的時(shí)間
                                    long msgStoreTime = defaultMessageStore.getCommitLog()
                                        .pickupStoreTimestamp(offsetPy, sizePy);
                                    // 2. 計(jì)算投遞時(shí)間,投遞時(shí)間 = 消息寫入時(shí)間 + 延遲級(jí)別對(duì)應(yīng)的時(shí)間
                                    tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
                                }
                            }

                            // 處理投遞時(shí)間,保證投遞時(shí)間必須小于(當(dāng)前時(shí)間 + 延遲級(jí)別對(duì)應(yīng)的時(shí)間)
                            long now = System.currentTimeMillis();
                            long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);

                            nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

                            long countdown = deliverTimestamp - now;

                            // 小于等于0,表示消費(fèi)需要投遞
                            if (countdown <= 0) {
                                MessageExt msgExt =
                                    ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
                                        offsetPy, sizePy);

                                if (msgExt != null) {
                                    try {
                                        MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                                        if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC
                                                .equals(msgInner.getTopic())) {
                                            log.error(...);
                                            continue;
                                        }
                                        // 3. 投遞操作
                                        PutMessageResult putMessageResult =
                                            ScheduleMessageService.this.writeMessageStore
                                                .putMessage(msgInner);

                                        if (putMessageResult != null && putMessageResult
                                                .getPutMessageStatus() == PutMessageStatus.PUT_OK) {
                                            continue;
                                        } else {
                                            // XXX: warn and notify me
                                            log.error(...);
                                            ScheduleMessageService.this.timer.schedule(
                                                new DeliverDelayedMessageTimerTask(this.delayLevel,
                                                    nextOffset), DELAY_FOR_A_PERIOD);
                                            ScheduleMessageService.this.updateOffset(this.delayLevel,
                                                nextOffset);
                                            return;
                                        }
                                    } catch (Exception e) {
                                        log.error(...);
                                    }
                                }
                            } else {
                                // 4. 安排下一次執(zhí)行,執(zhí)行時(shí)間為 countdown 毫秒后
                                ScheduleMessageService.this.timer.schedule(
                                    new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
                                    countdown);
                                // 5. 更新偏移量
                                ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                                return;
                            }
                        }
                        // 之后再執(zhí)行
                        nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                        ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                            this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
                        // 更新偏移量
                        ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                        return;
                    } finally {

                        bufferCQ.release();
                    }
                }
                else {

                    long cqMinOffset = cq.getMinOffsetInQueue();
                    if (offset < cqMinOffset) {
                        failScheduleOffset = cqMinOffset;
                        log.error(...);
                    }
                }
            }

            ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
                failScheduleOffset), DELAY_FOR_A_WHILE);
        }

        這個(gè)方法雖然有點(diǎn)長,但邏輯很清晰,執(zhí)行過程如下:

        1. 獲取消息寫入時(shí)間,就是寫入到commitLog的時(shí)間
        2. 計(jì)算投遞時(shí)間,投遞時(shí)間 = 消息寫入時(shí)間 + 延遲級(jí)別對(duì)應(yīng)的時(shí)間,如果當(dāng)前時(shí)間大于等于投遞時(shí)間,就表示消息需要進(jìn)行投遞操作
        3. 如果消息滿足投遞時(shí)間,就進(jìn)行投遞操作,所謂的投遞操作,就是將消息寫入到真正的topicqueueId的隊(duì)列中
        4. 如果當(dāng)前消息不滿足投遞時(shí)間,就表明該隊(duì)列上之后的消息也不會(huì)投遞時(shí)間,就計(jì)算投遞時(shí)間與當(dāng)前時(shí)間的差值,這個(gè)差值就是下次執(zhí)行executeOnTimeup()方法的時(shí)間
        5. 更新偏移量,就是記錄當(dāng)前隊(duì)列的消費(fèi)位置

        我們來看看偏移量的更新操作,進(jìn)入ScheduleMessageService#updateOffset方法:

        public class ScheduleMessageService extends ConfigManager {
            private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);

            private static final long FIRST_DELAY_TIME = 1000L;
            private static final long DELAY_FOR_A_WHILE = 100L;
            private static final long DELAY_FOR_A_PERIOD = 10000L;

            /** 延遲級(jí)別對(duì)應(yīng)的延遲時(shí)間,單位為毫秒 */
            private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
                new ConcurrentHashMap<Integer, Long>(32);

            /** 延遲級(jí)別對(duì)應(yīng)的偏移量 */
            private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
                new ConcurrentHashMap<Integer, Long>(32);

            private final DefaultMessageStore defaultMessageStore;
            private final AtomicBoolean started = new AtomicBoolean(false);
            private Timer timer;
            private MessageStore writeMessageStore;
            private int maxDelayLevel;

            ...

            /**
             * 更新偏移量的操作
             */

            private void updateOffset(int delayLevel, long offset) {
                this.offsetTable.put(delayLevel, offset);
            }

            ...
        }

        可以看到,這里的更新偏移量,就是將當(dāng)前延遲級(jí)別消費(fèi)位置的偏移量添加到offsetTable中進(jìn)行保存。

        3.2 持久化

        讓我們回到``ScheduleMessageService#start`方法,這個(gè)方法中開啟了一個(gè)持久化任務(wù):

        this.timer.scheduleAtFixedRate(new TimerTask() {

            @Override
            public void run() {
                try {
                    if (started.get()) ScheduleMessageService.this.persist();
                } catch (Throwable e) {
                    log.error("scheduleAtFixedRate flush exception", e);
                }
            }
        }, 10000this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());

        該任務(wù)會(huì)定期執(zhí)行ConfigManager#persist方法進(jìn)行持久化操作:

        public synchronized void persist() {
            String jsonString = this.encode(true);
            if (jsonString != null) {
                String fileName = this.configFilePath();
                try {
                    MixAll.string2File(jsonString, fileName);
                } catch (IOException e) {
                    log.error("persist file " + fileName + " exception", e);
                }
            }
        }

        這個(gè)方法主要進(jìn)行了兩個(gè)操作:

        1. 調(diào)用this.encode(true)得到json字符串
        2. json字符串寫入到文件中

        這個(gè)json字符串是個(gè)啥呢?我們進(jìn)入ScheduleMessageService#encode(boolean)方法:

        public String encode(final boolean prettyFormat) {
            DelayOffsetSerializeWrapper delayOffsetSerializeWrapper 
                = new DelayOffsetSerializeWrapper();
            // 這個(gè) offsetTable 就是用來保存消費(fèi)位置偏移量的
            delayOffsetSerializeWrapper.setOffsetTable(this.offsetTable);
            return delayOffsetSerializeWrapper.toJson(prettyFormat);
        }

        從代碼來看,這個(gè)方法就是將ScheduleMessageService#offsetTable序列化成json字符串的, 這個(gè) offsetTable 就是用來保存消費(fèi)位置偏移量的。由此不難得出這個(gè)定時(shí)任務(wù)的作用:定期將延遲隊(duì)列的消費(fèi)位置偏移量持久化到文件中。

        4. 總結(jié)

        1. RocketMq支持了18種延遲級(jí)別,每個(gè)延遲級(jí)別對(duì)應(yīng)不同的延遲時(shí)間
        2. 延遲消息對(duì)應(yīng)著一個(gè)topic,每個(gè)延遲級(jí)別都對(duì)應(yīng)著該topic下的一個(gè)隊(duì)列
        3. 當(dāng)broker收到延遲消息后,會(huì)將該消息放入到延遲級(jí)別對(duì)應(yīng)的延遲消息中
        4. 消息投遞由定時(shí)線程執(zhí)行,當(dāng)消息達(dá)到投遞時(shí)間后,會(huì)從延遲隊(duì)列中寫入到真正需要投遞的隊(duì)列中

        客觀來說,開源版 RocketMq 的延遲消息比較簡陋,僅支持18種延遲級(jí)別,而阿里云版可指定發(fā)送時(shí)間。


        限于作者個(gè)人水平,文中難免有錯(cuò)誤之處,歡迎指正!原創(chuàng)不易,商業(yè)轉(zhuǎn)載請(qǐng)聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請(qǐng)注明出處。

        本文首發(fā)于微信公眾號(hào) 「Java技術(shù)探秘」,如果您喜歡本文,歡迎關(guān)注該公眾號(hào),讓我們一起在技術(shù)的世界里探秘吧!


        瀏覽 41
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            凉森玲梦一区二区三区av免费 | 堕落人妻4玛丽莲果冻传媒 | 国产黄在线视频 | 朋友娇妻的滋味hd | 鸡巴网站 | 中文字幕日韩在线观看 | 《貂蝉艳史》完整版 | 日韩一区二区不卡 | 天天做天天爱夜夜爽少妇 | 国产一级一级农村 |