1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        延時(shí)消息常見實(shí)現(xiàn)方案

        共 4704字,需瀏覽 10分鐘

         ·

        2022-01-22 16:08



        ?1?
        前言

        延時(shí)消息(定時(shí)消息)指的在分布式異步消息場(chǎng)景下,生產(chǎn)端發(fā)送一條消息,希望在指定延時(shí)或者指定時(shí)間點(diǎn)被消費(fèi)端消費(fèi)到,而不是立刻被消費(fèi)。


        延時(shí)消息適用的業(yè)務(wù)場(chǎng)景非常的廣泛,在分布式系統(tǒng)環(huán)境下,延時(shí)消息的功能一般會(huì)在下沉到中間件層,通常是 MQ 中內(nèi)置這個(gè)功能或者內(nèi)聚成一個(gè)公共基礎(chǔ)服務(wù)。


        本文旨在探討常見延時(shí)消息的實(shí)現(xiàn)方案以及方案設(shè)計(jì)的優(yōu)缺點(diǎn)。


        ?2?
        實(shí)現(xiàn)方案


        1. 基于外部存儲(chǔ)實(shí)現(xiàn)的方案

        這里討論的外部存儲(chǔ)指的是在 MQ 本身自帶的存儲(chǔ)以外又引入的其他的存儲(chǔ)系統(tǒng)。


        基于外部存儲(chǔ)的方案本質(zhì)上都是一個(gè)套路,將 MQ 和 延時(shí)模塊 區(qū)分開來,延時(shí)消息模塊是一個(gè)獨(dú)立的服務(wù)/進(jìn)程。延時(shí)消息先保留到其他存儲(chǔ)介質(zhì)中,然后在消息到期時(shí)再投遞到 MQ。當(dāng)然還有一些細(xì)節(jié)性的設(shè)計(jì),比如消息進(jìn)入的延時(shí)消息模塊時(shí)已經(jīng)到期則直接投遞這類的邏輯,這里不展開討論。



        下述方案不同的是,采用了不同的存儲(chǔ)系統(tǒng)。

        基于 數(shù)據(jù)庫(如MySQL)


        基于關(guān)系型數(shù)據(jù)庫(如MySQL)延時(shí)消息表的方式來實(shí)現(xiàn)。

        CREATE TABLE `delay_msg` (  `id` bigint unsigned NOT NULL AUTO_INCREMENT,  `delivery_time` DATETIME NOT NULL COMMENT '投遞時(shí)間',  `payloads` blob COMMENT '消息內(nèi)容',  PRIMARY KEY (`id`),  KEY `time_index` (`delivery_time`))

        通過定時(shí)線程定時(shí)掃描到期的消息,然后進(jìn)行投遞。定時(shí)線程的掃描間隔理論上就是你延時(shí)消息的最小時(shí)間精度。


        優(yōu)點(diǎn):

        • 實(shí)現(xiàn)簡(jiǎn)單;


        缺點(diǎn):

        • B+Tree索引不適合消息場(chǎng)景的大量寫入;


        ?2?
        基于 RocksDB

        RocksDB 的方案其實(shí)就是在上述方案上選擇了比較合適的存儲(chǔ)介質(zhì)。


        RocksDB 在筆者之前的文章中有聊過,LSM 樹更適合大量寫入的場(chǎng)景。滴滴開源的DDMQ中的延時(shí)消息模塊 Chronos 就是采用了這個(gè)方案。


        DDMQ 這個(gè)項(xiàng)目簡(jiǎn)單來說就是在 RocketMQ 外面加了一層統(tǒng)一的代理層,在這個(gè)代理層就可以做一些功能維度的擴(kuò)展。延時(shí)消息的邏輯就是代理層實(shí)現(xiàn)了對(duì)延時(shí)消息的轉(zhuǎn)發(fā),如果是延時(shí)消息,會(huì)先投遞到 RocketMQ 中 Chronos 專用的 topic 中。延時(shí)消息模塊 Chronos 消費(fèi)得到延時(shí)消息轉(zhuǎn)儲(chǔ)到 RocksDB,后面就是類似的邏輯了,定時(shí)掃描到期的消息,然后往 RocketMQ 中投遞。



        這個(gè)方案老實(shí)說是一個(gè)比較重的方案。因?yàn)榛?RocksDB 來實(shí)現(xiàn)的話,從數(shù)據(jù)可用性的角度考慮,你還需要自己去處理多副本的數(shù)據(jù)同步等邏輯。


        優(yōu)點(diǎn):

        • RocksDB LSM 樹很適合消息場(chǎng)景的大量寫入;


        缺點(diǎn):

        • 實(shí)現(xiàn)方案較重,如果你采用這個(gè)方案,需要自己實(shí)現(xiàn) RocksDB 的數(shù)據(jù)容災(zāi)邏輯;


        ?2?
        基于Redis

        再來聊聊 Redis 的方案。下面放一個(gè)比較完善的方案。

        本方案來源于:基于Redis實(shí)現(xiàn)延時(shí)隊(duì)列服務(wù)


        • Messages Pool 所有的延時(shí)消息存放,結(jié)構(gòu)為KV結(jié)構(gòu),key為消息ID,value為一個(gè)具體的message(這里選擇Redis Hash結(jié)構(gòu)主要是因?yàn)閔ash結(jié)構(gòu)能存儲(chǔ)較大的數(shù)據(jù)量,數(shù)據(jù)較多時(shí)候會(huì)進(jìn)行漸進(jìn)式rehash擴(kuò)容,并且對(duì)于HSET和HGET命令來說時(shí)間復(fù)雜度都是O(1))

        • Delayed Queue是16個(gè)有序隊(duì)列(隊(duì)列支持水平擴(kuò)展),結(jié)構(gòu)為ZSET,value 為 messages pool中消息ID,score為過期時(shí)間**(分為多個(gè)隊(duì)列是為了提高掃描的速度)**

        • Worker 代表處理線程,通過定時(shí)任務(wù)掃描 Delayed Queue 中到期的消息


        這個(gè)方案選用 Redis 存儲(chǔ)在我看來有幾點(diǎn)考慮。

        • Redis ZSET 很適合實(shí)現(xiàn)延時(shí)隊(duì)列

        • 性能問題,雖然 ZSET 插入是一個(gè) O(logn) 的操作,但是Redis 基于內(nèi)存操作,并且內(nèi)部做了很多性能方面的優(yōu)化。


        但是這個(gè)方案其實(shí)也有需要斟酌的地方,上述方案通過創(chuàng)建多個(gè) Delayed Queue 來滿足對(duì)于并發(fā)性能的要求,但這也帶來了多個(gè) Delayed Queue 如何在多個(gè)節(jié)點(diǎn)情況下均勻分配,并且很可能出現(xiàn)到期消息并發(fā)重復(fù)處理的情況,是否要引入分布式鎖之類的并發(fā)控制設(shè)計(jì)?


        在量不大的場(chǎng)景下,上述方案的架構(gòu)其實(shí)可以蛻化成主從架構(gòu),只允許主節(jié)點(diǎn)來處理任務(wù),從節(jié)點(diǎn)只做容災(zāi)備份。實(shí)現(xiàn)難度更低更可控。


        ?3?
        定時(shí)線程檢查的缺陷與改進(jìn)


        上述幾個(gè)方案中,都通過線程定時(shí)掃描的方案來獲取到期的消息。


        定時(shí)線程的方案在消息量較少的時(shí)候,會(huì)浪費(fèi)資源,在消息量非常多的時(shí)候,又會(huì)出現(xiàn)因?yàn)閽呙栝g隔設(shè)置不合理導(dǎo)致延時(shí)時(shí)間不準(zhǔn)確的問題??梢越柚?JDK Timer 類中的思想,通過 wait-notify 來節(jié)省 CPU 資源。


        獲取中最近的延時(shí)消息,然后wait(執(zhí)行時(shí)間-當(dāng)前時(shí)間),這樣就不需要浪費(fèi)資源到達(dá)時(shí)間時(shí)會(huì)自動(dòng)響應(yīng),如果有新的消息進(jìn)入,并且比我們等待的消息還要小,那么直接notify喚醒,重新獲取這個(gè)更小的消息,然后又wait,如此循環(huán)。


        2. 開源 MQ 中的實(shí)現(xiàn)方案


        再來講講目前自帶延時(shí)消息功能的開源MQ,它們是如何實(shí)現(xiàn)的


        ?4?
        RocketMQ


        RocketMQ 開源版本支持延時(shí)消息,但是只支持 18 個(gè) Level 的延時(shí),并不支持任意時(shí)間。只不過這個(gè) Level 在 RocketMQ 中可以自定義的,所幸來說對(duì)普通業(yè)務(wù)算是夠用的。默認(rèn)值為“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18個(gè)level。

        通俗的講,設(shè)定了延時(shí) Level 的消息會(huì)被暫存在名為SCHEDULE_TOPIC_XXXX的topic中,并根據(jù) level 存入特定的queue,queueId = delayTimeLevel – 1,**即一個(gè)queue只存相同延時(shí)的消息,保證具有相同發(fā)送延時(shí)的消息能夠順序消費(fèi)。**broker會(huì)調(diào)度地消費(fèi)SCHEDULE_TOPIC_XXXX,將消息寫入真實(shí)的topic。

        下面是整個(gè)實(shí)現(xiàn)方案的示意圖,紅色代表投遞延時(shí)消息,紫色代表定時(shí)調(diào)度到期的延時(shí)消息:


        優(yōu)點(diǎn):
        • Level 數(shù)固定,每個(gè) Level 有自己的定時(shí)器,開銷不大
        • 將 Level 相同的消息放入到同一個(gè) Queue 中,保證了同一 Level 消息的順序性;不同 Level 放到不同的 Queue 中,保證了投遞的時(shí)間準(zhǔn)確性;
        • 通過只支持固定的Level,將不同延時(shí)消息的排序變成了固定Level Topic 的追加寫操作

        缺點(diǎn):
        • Level 配置的修改代價(jià)太大,固定 Level 不靈活
        • CommitLog 會(huì)因?yàn)檠訒r(shí)消息的存在變得很大


        ?5?
        Pulsar

        Pulsar 支持“任意時(shí)間”的延時(shí)消息,但實(shí)現(xiàn)方式和 RocketMQ 不同。

        通俗的講,Pulsar 的延時(shí)消息會(huì)直接進(jìn)入到客戶端發(fā)送指定的 Topic 中,然后在堆外內(nèi)存中創(chuàng)建一個(gè)基于時(shí)間的優(yōu)先級(jí)隊(duì)列,來維護(hù)延時(shí)消息的索引信息。延時(shí)時(shí)間最短的會(huì)放在頭上,時(shí)間越長(zhǎng)越靠后。在進(jìn)行消費(fèi)邏輯時(shí)候,再判斷是否有到期需要投遞的消息,如果有就從隊(duì)列里面拿出,根據(jù)延時(shí)消息的索引查詢到對(duì)應(yīng)的消息進(jìn)行消費(fèi)。

        如果節(jié)點(diǎn)崩潰,在這個(gè) broker 節(jié)點(diǎn)上的 Topics 會(huì)轉(zhuǎn)移到其他可用的 broker 上,上面提到的這個(gè)優(yōu)先級(jí)隊(duì)列也會(huì)被重建。

        下面是 Pulsar 公眾號(hào)中對(duì)于 Pulsar 延時(shí)消息的示意圖。

        乍一看會(huì)覺得這個(gè)方案其實(shí)非常簡(jiǎn)單,還能支持任意時(shí)間的消息。但是這個(gè)方案有幾個(gè)比較大的問題:

        • 內(nèi)存開銷:?維護(hù)延時(shí)消息索引的隊(duì)列是放在堆外內(nèi)存中的,并且這個(gè)隊(duì)列是以訂閱組(Kafka中的消費(fèi)組)為維度的,比如你這個(gè) Topic 有 N 個(gè)訂閱組,那么如果你這個(gè) Topic 使用了延時(shí)消息,就會(huì)創(chuàng)建 N 個(gè) 隊(duì)列;并且隨著延時(shí)消息的增多,時(shí)間跨度的增加,每個(gè)隊(duì)列的內(nèi)存占用也會(huì)上升。(是的,在這個(gè)方案下,支持任意的延時(shí)消息反而有可能讓這個(gè)缺陷更嚴(yán)重)
        • 故障轉(zhuǎn)移之后延時(shí)消息索引隊(duì)列的重建時(shí)間開銷:?對(duì)于跨度時(shí)間長(zhǎng)的大規(guī)模延時(shí)消息,重建時(shí)間可能會(huì)到小時(shí)級(jí)別。(摘自 Pulsar 官方公眾號(hào)文章)
        • 存儲(chǔ)開銷:延時(shí)消息的時(shí)間跨度會(huì)影響到 Pulsar 中已經(jīng)消費(fèi)的消息數(shù)據(jù)的空間回收。打個(gè)比方,你的 Topic 如果業(yè)務(wù)上要求支持一個(gè)月跨度的延時(shí)消息,然后你發(fā)了一個(gè)延時(shí)一個(gè)月的消息,那么你這個(gè) Topic 中底層的存儲(chǔ)就會(huì)保留整整一個(gè)月的消息數(shù)據(jù),即使這一個(gè)月中99%的正常消息都已經(jīng)消費(fèi)了。

        對(duì)于前面第一點(diǎn)和第二點(diǎn)的問題,社區(qū)也設(shè)計(jì)了解決方案,在隊(duì)列中加入時(shí)間分區(qū),Broker 只加載當(dāng)前較近的時(shí)間片的隊(duì)列到內(nèi)存,其余時(shí)間片分區(qū)持久化磁盤,示例圖如下圖所示:

        但是目前,這個(gè)方案并沒有對(duì)應(yīng)的實(shí)現(xiàn)版本??梢栽趯?shí)際使用時(shí),規(guī)定只能使用較小時(shí)間跨度的延時(shí)消息,來減少前兩點(diǎn)缺陷的影響。

        另外,因?yàn)閮?nèi)存中存的并不是延時(shí)消息的全量數(shù)據(jù),只是索引,所以可能要積壓上百萬條延時(shí)消息才可能對(duì)內(nèi)存造成顯著影響,從這個(gè)角度來看,官方暫時(shí)沒有完善前兩個(gè)問題也可以理解了。

        至于第三個(gè)問題,估計(jì)是比較難解決的,需要在數(shù)據(jù)存儲(chǔ)層將延時(shí)消息和正常消息區(qū)分開來,單獨(dú)存儲(chǔ)延時(shí)消息。

        ?6?
        QMQ

        QMQ提供任意時(shí)間的延時(shí)/定時(shí)消息,你可以指定消息在未來兩年內(nèi)(可配置)任意時(shí)間內(nèi)投遞。

        把 QMQ 放到最后,是因?yàn)槲矣X得 QMQ 是目前開源 MQ 中延時(shí)消息設(shè)計(jì)最合理的。里面設(shè)計(jì)的核心簡(jiǎn)單來說就是?多級(jí)時(shí)間輪 + 延時(shí)加載 + 延時(shí)消息單獨(dú)磁盤存儲(chǔ)。
        如果對(duì)時(shí)間輪不熟悉的可以閱讀筆者的這篇文章?從 Kafka 看時(shí)間輪算法設(shè)計(jì)

        QMQ的延時(shí)/定時(shí)消息使用的是兩層 hash wheel 來實(shí)現(xiàn)的。

        第一層位于磁盤上,每個(gè)小時(shí)為一個(gè)刻度(默認(rèn)為一個(gè)小時(shí)一個(gè)刻度,可以根據(jù)實(shí)際情況在配置里進(jìn)行調(diào)整),每個(gè)刻度會(huì)生成一個(gè)日志文件(schedule log),因?yàn)镼MQ支持兩年內(nèi)的延時(shí)消息(默認(rèn)支持兩年內(nèi),可以進(jìn)行配置修改),則最多會(huì)生成 2 * 366 * 24 = 17568 個(gè)文件(如果需要支持的最大延時(shí)時(shí)間更短,則生成的文件更少)。

        第二層在內(nèi)存中,當(dāng)消息的投遞時(shí)間即將到來的時(shí)候,會(huì)將這個(gè)小時(shí)的消息索引(索引包括消息在schedule log中的offset和size)從磁盤文件加載到內(nèi)存中的hash wheel上,內(nèi)存中的hash wheel則是以500ms為一個(gè)刻度。

        總結(jié)一下設(shè)計(jì)上的亮點(diǎn):
        • 時(shí)間輪算法適合延時(shí)/定時(shí)消息的場(chǎng)景,省去延時(shí)消息的排序,插入刪除操作都是 O(1) 的時(shí)間復(fù)雜度;
        • 通過多級(jí)時(shí)間輪設(shè)計(jì),支持了超大時(shí)間跨度的延時(shí)消息;
        • 通過延時(shí)加載,內(nèi)存中只會(huì)有最近要消費(fèi)的消息,更久的延時(shí)消息會(huì)被存儲(chǔ)在磁盤中,對(duì)內(nèi)存友好;
        • 延時(shí)消息單獨(dú)存儲(chǔ)(schedule log),不會(huì)影響到正常消息的空間回收;


        ?7?
        總結(jié)

        本文匯總了目前業(yè)界常見的延時(shí)消息方案,并且討論了各個(gè)方案的優(yōu)缺點(diǎn)。希望對(duì)讀者有所啟發(fā)。

        作者:Richard_Yi

        來源:juejin.cn/post/7052894117105238053

        瀏覽 52
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            青娱乐国产 | 日本少妇片 | www.老色 | 东方成人AV | 国产喷水视频 | 色色色色色色网站 | 国产伦精品一区二区三区视频1 | 青青草91 | 丁香五月深深爱 | 麻豆传媒操逼 |