1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        《RabbitMQ》| 解決消息延遲和堆積問題

        共 4233字,需瀏覽 9分鐘

         ·

        2021-11-10 07:27

        大家好,我是小菜。一個希望能夠成為 吹著牛X談架構(gòu) 的男人!如果你也想成為我想成為的人,不然點個關注做個伴,讓小菜不再孤單!

        本文主要介紹 RabbitMQ的常見問題

        如有需要,可以參考

        如有幫助,不忘 點贊 ?

        微信公眾號已開啟,小菜良記,沒關注的同學們記得關注哦!

        • 消息可靠性問題:如何確保發(fā)送的消息至少被消費一次?
        • 延遲消息問題:如何實現(xiàn)消息的延遲投遞?
        • 消息堆積問題:如何解決數(shù)百萬級以上消息堆積,無法及時消費問題?

        我們在上篇已經(jīng)說明了如何解決消息丟失的問題,也就是保證了消息的可靠性,那么其余兩個問題同樣重要,這篇我們將講述其余兩個問題的解決方式~!

        消息丟失解決方案:《RabbitMQ》 | 消息丟失也就這么回事

        一、延遲消息

        延遲消息 字面意思就是讓延遲接收消息,那么如何能讓消息延遲到達?這就是我們要思考解決的問題,在了解延遲隊列之前我們需要先明白 RabbitMQ 中的兩個概念

        • 死信交換機
        • TTL

        1)死信交換機

        死信(dead letter),也就是廢棄已死亡的消息,那什么情況下一個普通的消息能夠成為死信?需要符合以下三個條件:

        1. 消費者使用 basic.rejectbasic.nack 聲明消費失敗,并將消息的 requeue 參數(shù)設置為 false
        2. 消息是一個過期消息,超時后無人消費
        3. 要投遞的隊列消息堆積滿了,最早的消息就會成為死信

        死信交換機 便是 死信 的歸屬。

        如果一個隊列配置了 dead-letter-exchange 屬性,指定了一個交換機,那么隊列中的死信就會投遞到這個交換機中,而這個交換機就稱為 死信交換機 - DLXDead Letter Exchange

        步驟:當生產(chǎn)者正常投遞到隊列(simple.queue)中,如果消費者從隊列(simple.queue) 消費消息卻聲明了 reject,那并且隊列綁定了死信交換機(dl.queue),那么這個時候成為死信的消息就會投遞到這個死信隊列(dl.queue)中。

        79c6d20000294d5b4f43d615f6bb3d1a.webp死信投遞過程

        正常隊列 --> 死信隊列 的過程,我們必須聲明兩個關鍵信息

        • 死信交換機的名稱
        • 死信交換機與死信隊列綁定的路由key

        而這兩個信息也是我們投遞消息的基礎配置。

        接下來我們簡單模擬一下 條件1 所產(chǎn)生的場景

        1、首先聲明一個死信交換機和死信隊列

        我們這邊是使用簡單的注解方式直接生成

        1ceec37646e579d400186db2c4379c4f.webp生成死信交換機和死信隊列

        通過 RabbitMQ 控制臺界面可以看出已經(jīng)成功生成

        582f1eb00e613a28b63268a141dbdff9.webp
        2、聲明正常使用交換機與隊列

        然后這個時候我們就可以創(chuàng)建一個正常使用的交換機與隊列,并指明死信交換機

        27180e271c8609f49335e4337b357445.webp

        同樣可以通過控制臺查看創(chuàng)建狀態(tài)

        27180e271c8609f49335e4337b357445.webp

        其中是否有聲明死信交換機我們可以通過隊列的 DLXDLK 標志判斷

        3、模擬拒收

        然后我們現(xiàn)在通過代碼模擬客戶端拒絕消息的場景

        1)消息發(fā)送

        3e35bbfb18fcc2cd1863ee856f4c44e8.webp

        2)消息接收

        3e35bbfb18fcc2cd1863ee856f4c44e8.webp

        查看控制臺,結(jié)果如下:

        2021-11-06?23:56:52.095??INFO?2112?---?[ntContainer#0-1]?c.l.m.c.listener.SpringRabbitListener????:?正常業(yè)務交換機?|?接收到的消息?:?[hello]
        2021-11-06?23:56:52.118??INFO?2112?---?[ntContainer#1-1]?c.l.m.c.listener.SpringRabbitListener????:?死信交換機?|?接收到的消息?:?hello

        這說明我們死信交換機已經(jīng)成功發(fā)揮作用

        2)TTL

        以上我們已經(jīng)成功認識到了 死信交換機 的使用,但是這與我們一開始說的 延遲隊列 似乎并沒有太大關系,莫急~接下來說到的 TTL(Time-To-Live) 就是用來處理延遲消息的~!

        在 TTL 的概念中,如果一個隊列中的消息 TTL 結(jié)束后仍未被消費,那么這個消息就會自動變?yōu)樗佬牛?TTL 超時情況分為兩種:

        1. 消息所在的隊列設置了存活時間
        2. 消息本身設置了存活時間
        dc5384b28159208eaedb27d28cf8d64a.webp

        我們同樣進行上述 條件2 的模擬場景

        1、聲明死信交換機與死信隊列(上述已完成)
        2、聲明延遲隊列并指定死信交換機
        795338b120a773eb9c239305be8d5ffb.webp

        同樣控制臺查看創(chuàng)建結(jié)果,并且我們發(fā)現(xiàn)不止有 DLXDLK 標志,還多了個 TTL ,說明該隊列是延遲隊列

        1fec583091dbf2dd996799c498fe6dda.webp
        3、模擬消費超時情況

        我們往延遲隊列中發(fā)送一條消息,并且沒有消費者進行消費,等待 1 分鐘后查看是否能進入 死信隊列

        93044138bc7ff6a79c01823ac6e4349a.webp

        我們已經(jīng)發(fā)送了一條消息到延遲隊列并且一分鐘后也成功在控制臺發(fā)現(xiàn)了這條信息已經(jīng)進入到了死信交換機

        2021-11-07?00:01:30.854??INFO?32752?---?[ntContainer#1-1]?c.l.m.c.listener.SpringRabbitListener????:?死信交換機?|?接收到的消息?:?test?ttl-message

        以上是配置了隊列超時時間,消息本身自然也能配置超時時間,當 消息隊列 都存在超時時間時,那么就以最短的 TTL 為準,消息的超時配置如下:

        7cfd1269da86b3f6c26b477d550cb226.webp

        如上圖所示,我們可以利用 Message 這個類來傳遞消息信息,并設置上超時時間,我們設置的是 5000 ms,等待發(fā)送成功后,控制臺過5000 ms 也成功打印了死信交換機消費的消息:

        2021-11-07?00:03:09.048??INFO?39996?---?[ntContainer#1-1]?c.l.m.c.listener.SpringRabbitListener????:?死信交換機?|?接收到的消息?:?this?is?a?ttl?message

        3)延遲隊列

        我們上述是使用 死信交換機 來間接實現(xiàn) 延遲隊列 的效果,但實際在 RabbitMQ 不必如此麻煩,RabbitMQ 已經(jīng)為我們封裝好了插件,我們只需要下載安裝即可~

        RabbitMQ 插件下載地址

        我們進入地址可以發(fā)現(xiàn)有許多插件,搜索 delay 關鍵字找到我們需要的插件進行下載

        7cfd1269da86b3f6c26b477d550cb226.webp

        下載完后直接上傳到 RabbitMQ 的插件目錄 - plugins,小菜這邊是使用 docker 臨時安裝測試的,所以已經(jīng)將該插件目錄掛載出來了:

        docker?run?-itd?--name?rabbitmq?-v?plugins:/plugins?-p?15672:15672?-p?5672:5672?rabbitmq:management

        因此我這邊直接將插件上傳到容器中的 plugins 目錄即可~

        然后進入到容器中執(zhí)行以下命令進行插件開啟

        rabbitmq-plugins?enable?rabbitmq_delayed_message_exchange
        8861bd7af5f3c7356b9ec2217ba8f3c2.webp

        并且我們在控制臺創(chuàng)建交換機的時候可以看到 type 類型多了個選項

        670c7faee20f25aefb05cbe262667e5e.webp

        成功執(zhí)行到這步就說明已經(jīng)開啟了 RabbitMQ 的延遲隊列功能

        那接下來我們就可以來使用 DelayExchange,首先我們需要了解代碼的方式創(chuàng)建延遲交換機:

        方式1
        6f7d65630f61afc3d9cebeeb9dda5c0b.webp
        方式2
        fda140282e78b53cfe34d66ba8b72782.webp

        當我們?nèi)f事具備之后就可以來發(fā)送消息了

        在發(fā)送消息的時候,消息頭中一定要攜帶上 x-delay 參數(shù),指定上延遲時間

        5261c3ff4e473bc064238a618c202ede.webp

        通過這樣配置之后,我們可以在控制臺看到,經(jīng)過10秒后 delay.queue 才收到對應消息,然后被對應消費者消費

        3)總結(jié)

        我們上面從 死信交換機TTL延遲隊列,一步步認識了如何實現(xiàn)延遲消息的功能,然后我們進行一個小小的總結(jié):

        問題1:什么樣的消息會成為死信?
        1. 消息被消費者 reject 或返回 nack
        2. 消息超時未及時消費
        3. 消息隊列滿了
        問題2:消息超時的方式
        1. 給隊列設置 TTL 屬性
        2. 給消息設置 TTL 屬性
        問題3:如何使用延遲隊列
        1. 下載并啟用 RabbitMQ 延遲隊列插件
        2. 聲明一個交換機,并將 delayed 屬性設置為 true
        3. 發(fā)送消息時,添加 x-delay 頭,值為超時時間
        問題4:延遲隊列的使用場景
        1. 延遲發(fā)送短信通知
        2. 訂單自動取消
        3. 庫存自動回滾

        二、惰性隊列

        講完延遲隊列,我們繼續(xù)來認識惰性隊列

        惰性隊列之前,我們先拋出一個問題~

        RabbitMQ 如何解決消息堆積問題

        什么情況下會出現(xiàn)消息堆積問題?

        1. 當生產(chǎn)者生產(chǎn)速度遠遠消費者消費速度
        2. 當消費者宕機沒有及時重啟

        那么如何解決這個問題?通常思路如下:

        1. 在消費者機器重啟后,增加更多的消費者進行處理
        2. 在消費者處理邏輯內(nèi)部開辟線程池,利用多線程的方式提高處理速度
        3. 擴大隊列的容量,提高堆積上限

        這幾個方式從理論上來說解決消息堆積問題也是沒有問題的,但是處理方式不夠優(yōu)雅甚至不夠靈活~ 那么除了以上的幾種解決方式,我們可以利用 RabbitMQ 中自帶的一種隊列類型 -- 惰性隊列

        什么是惰性隊列?我們認識一下惰性隊列的幾個特性:

        • 接收到消息后直接存入磁盤而非內(nèi)存
        • 消費者要消費消息時才會從磁盤中讀取并加載到內(nèi)存中
        • 它支持百萬級消息的存儲

        說到底,就是利用磁盤的緩沖機制,而這種機制的缺點就是消息的時效性會降低,性能受限于磁盤的IO,認識特性和缺點之后,我們便來看看如何創(chuàng)建惰性隊列

        方式1
        e9d57588b40e78909c9cdbaa94a75d47.webp
        方式2
        290bc69e8db114e3782590c0517cc350.webp
        方式3

        該方式是直接基于命令行修改將一個正在運行中的隊列修改為惰性隊列

        rabbitmqctl?set_policy?Lazy?"^lazy-queue$"?'{"queue-mode":"lazy"}'?--apply-to?queues??

        其中幾個命令參數(shù)含義如下:

        • rabbitmqctl:命令行工具
        • set_policy:添加一個策略
        • Lazy:策略名稱,可以自定義
        • ^lazy-queue$:用正則表達式匹配隊列的名稱
        • '{"queue-mode":"lazy"}':設置隊列為 lazy 模式
        • --apply-to queues:策略的作用對象,是所有的隊列

        這種惰性隊列的方式盡管缺點是消息時效性會降低,但是在某些場景下也不是不能接受,何況它的優(yōu)點同樣明顯:

        • 基于磁盤存儲,消息上限高
        • 沒有間歇性的 page-out,性能穩(wěn)定

        到這里,我們就已經(jīng)講述了 RabbitMQ 的常見問題,對于我們來說,普通的開發(fā)場景可能比較少遇到這些問題,但是沒遇到不等于沒有,所以我們還是需要多認識來防患于未然!

        不要空談,不要貪懶,和小菜一起做個吹著牛X做架構(gòu)的程序猿吧~點個關注做個伴,讓小菜不再孤單。咱們下文見!

        0517677a2f55515d134bee26036b68b3.webp看完不贊,都是壞蛋

        今天的你多努力一點,明天的你就能少說一句求人的話!我是小菜,一個和你一起變強的男人。 ??微信公眾號已開啟,小菜良記,沒關注的同學們記得關注哦!


        瀏覽 77
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            在全网站搜索美日逼 | 欧洲中文字幕日韩精品成人 | 无遮挡裸光屁屁打屁股绿帽社 | 国产日韩在线一级 | www.蜜乳av.com | 国产精品爽爽久久久 | 欧美精品xxx | 老师洗澡时让我进去摸她那个 | 玉女名器爽到娇喘不停小说 | 日本精品视频在线 |