1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        如何基于RocketMQ設(shè)計(jì)一套全鏈路消息不丟失方案?

        共 3339字,需瀏覽 7分鐘

         ·

        2021-11-19 06:16

        我們使用MQ作為消息中間件,傳輸一些消息的時(shí)候,必須考慮到消息丟失的可能。因?yàn)橛械臅r(shí)候消息丟失了,會產(chǎn)生很嚴(yán)重的后果,比如消息計(jì)費(fèi)數(shù)據(jù),跟錢有關(guān)的消息。


        這篇文章我們以RocketMQ為例來講解,如何設(shè)計(jì)一套全鏈路消息不丟失的方案。


        接下來我們分別講下生產(chǎn)者、broker、消費(fèi)者,如何確保消息不丟失的。


        1、生產(chǎn)者如何確保消息不丟失?


        發(fā)送消息的時(shí)候,可能存在消息的丟失,就是說可能消息根本就沒有進(jìn)入到MQ就丟了,我們看下面的圖。

        圖1?生產(chǎn)者丟失消息


        解決生產(chǎn)者丟失消息,一般有兩種方法。


        (1)重試發(fā)消


        RocketMQ生產(chǎn)者發(fā)送消息一般有三種api:

        • 同步發(fā)送

        • 異步發(fā)送

        • OneWay 發(fā)送
        同步發(fā)送,就是生產(chǎn)者向broker發(fā)送消息,阻塞當(dāng)前線程等待broker響應(yīng)發(fā)送結(jié)果。

        異步發(fā)送,就是生產(chǎn)者首先創(chuàng)建一個(gè)向broker發(fā)送消息的任務(wù),把該任務(wù)提交給線程池,等執(zhí)行完該任務(wù)時(shí),回調(diào)用戶自定義的回調(diào)函數(shù),執(zhí)行處理結(jié)果。

        Oneway發(fā)送,就是生產(chǎn)者只負(fù)責(zé)發(fā)送請求,不等待應(yīng)答,生產(chǎn)者只負(fù)責(zé)把請求發(fā)出去,而不處理響應(yīng)結(jié)果。

        為了確保消息一定發(fā)送到了broker,我們可以采用同步發(fā)送的方式,然后等待發(fā)送的結(jié)果。一直等待,如果消息發(fā)送失敗,或者M(jìn)Q內(nèi)部異常,我們肯定會收到一個(gè)異常,比如請求超時(shí),或者網(wǎng)絡(luò)錯(cuò)誤。

        如果我們在收到異常之后,就認(rèn)為消息到MQ發(fā)送失敗了,然后再次重試嘗試發(fā)送消息到MQ,接著再次同步等待MQ返回響應(yīng)給我們,這樣反復(fù)重試,是否可以確保消息一定會到達(dá)MQ?
        圖2?生產(chǎn)者重試發(fā)送消息

        理論上一些短暫網(wǎng)絡(luò)異常的場景下,我們是可以通過不停的重試去保證消息到達(dá)MQ的,因?yàn)槿绻虝r(shí)間網(wǎng)絡(luò)異常了消息一直沒法發(fā)送,我們只要不停的重試,網(wǎng)絡(luò)一旦恢復(fù)了,消息就可以發(fā)送到MQ了。

        如果要是反復(fù)重試多次還是沒法把消息投遞到MQ,此時(shí)我們就可以直接當(dāng)作消息發(fā)送失敗了。

        其代碼就像是這樣的:
        try {    doSomething();    // 發(fā)送消息到RocketMQ    producer.sendMessage();} catch (Exception e) {for (int i = 0; i < 3; ++i) {// 重試發(fā)消息        producer.sendMessage();    }    // 如果重試3次還是發(fā)送失敗,那么此次消息就發(fā)送失敗了。}
        另外,如果你是本地先執(zhí)行一些數(shù)據(jù)庫操作,再把消息發(fā)送到RocketMQ,那么就需要注意把本地事務(wù)與發(fā)送消息到RocketMQ放在一個(gè)事務(wù)里,保證執(zhí)行本地事務(wù)和發(fā)送消息要么一起成功,要么一起失敗。

        @Transactional(rollbackFor = Exception.class)public void payOrderSuccess()    // 執(zhí)行本地事務(wù)    try {        doSomething();// 發(fā)送消息到RocketMQ        producer.sendMessage();    } catch (Exception e) {for (int i = 0; i < 3; ++i) {// 重試發(fā)消息            producer.sendMessage();        }        // 如果重試3次還是發(fā)送失敗,那么此次消息就發(fā)送失敗了。        throw new Exception();    }}
        不過使用這種方式,要考慮到接口耗時(shí)問題,如果網(wǎng)絡(luò)異常,發(fā)送消息到RocketMQ的請求每次都到超時(shí)才返回,那么多次重試可能耗時(shí)很久,導(dǎo)致調(diào)用payOrderSuccess方法的接口超時(shí)異常。


        (2)RocketMQ事務(wù)

        RocketMQ支持事務(wù)消息機(jī)制,用事務(wù)機(jī)制保證生產(chǎn)者消息發(fā)送成功,這個(gè)方案在業(yè)內(nèi)還是比較常用的。這個(gè)方案落地之后,他可以保證你的本地事務(wù)一旦成功,那么消息必然會被投遞到MQ中去,業(yè)務(wù)系統(tǒng)的數(shù)據(jù)也是一致的。

        MQ事務(wù)機(jī)制原理還是有一點(diǎn)復(fù)雜的,放著這里講,文章篇幅會過長,所以會單獨(dú)起一篇文章講解MQ事務(wù)機(jī)制。

        不管是重試發(fā)消息的方法,還是事務(wù)機(jī)制,都會大大影響系統(tǒng)的吞吐量。

        2、broker如何確保消息不丟失?

        假如現(xiàn)在消息提交到MQ里去了,就一定不會丟失嗎?

        消息進(jìn)入MQ后會先落到磁盤上,但寫磁盤的過程,并不是一下子就寫到磁盤上的,而是先進(jìn)入os cache,再由操作系統(tǒng)的線程不定時(shí)刷到磁盤上去。

        假如此時(shí)這臺機(jī)器突然宕機(jī)了,os cache里的數(shù)據(jù)就全部丟失了,此時(shí)必然導(dǎo)致你的消息丟失。
        圖3 broker丟失消息

        那怎么去確保消息寫入MQ之后,MQ自己不要隨便丟失數(shù)據(jù)呢?

        解決這個(gè)問題的第一個(gè)關(guān)鍵點(diǎn),就是要知道broker的刷盤策略。broker的刷盤策略有兩種:異步刷盤,同步刷盤。

        異步刷盤,就是你的消息即使成功寫入了MQ,它也就在機(jī)器的os cache中,沒有進(jìn)入磁盤里,要過一會兒等操作系統(tǒng)自己把os cache里的數(shù)據(jù)實(shí)際刷入磁盤文件中去。

        所以異步刷盤模式,寫入消息的吞吐量肯定是非常高的,畢竟消息只需要進(jìn)入os cache就可以返回了,但是追求了性能,就降低了可用性,消息就有丟失的風(fēng)險(xiǎn)。

        所以如果一定要確保數(shù)據(jù)零丟失的話,可以調(diào)整MQ的刷盤策略為同步刷盤。

        RocketMQ broker的默認(rèn)刷盤策略為異步刷盤,即ASYNC_FLUSH??梢詫roker的配置文件中的flushDiskType配置設(shè)置為:SYNC_FLUSH同步刷盤。

        同步刷盤之后,我們寫入MQ的每條消息,只要MQ告訴我們寫入成功了,那么就表示已經(jīng)進(jìn)入了磁盤文件了。

        同步刷盤,broker就一定不會丟失數(shù)據(jù)嗎?如果broker磁盤損壞了呢?

        接著我們就要講下,如何避免磁盤故障導(dǎo)致數(shù)據(jù)丟失。

        其實(shí)也很簡單,我們必須要對Broker使用主從架構(gòu)的模式

        也就是說,必須讓一個(gè)Master Broker有一個(gè)Slave Broker去同步它的數(shù)據(jù),而且你一條消息寫入成功,必須是讓slave Broker也寫入成功,保證數(shù)據(jù)有多個(gè)冗余的副本。
        圖4?broker主從復(fù)制

        這樣一來,你一條消息只要寫入成功了,此時(shí)主從master Broker和slave broker上都有這條數(shù)據(jù)了,此時(shí)如果你的Master Broker的磁盤損壞了,但是Slave Broker上至少還是有數(shù)據(jù)的,數(shù)據(jù)是不會因?yàn)榇疟P故障而丟失的。

        RocketMQ從4.5.0版本開始使用Dledger技術(shù)和基于Raft協(xié)議實(shí)現(xiàn),自動故障轉(zhuǎn)移,有興趣的同學(xué)可以自行去查閱相關(guān)資料。

        3 如何保證消費(fèi)者消息不丟失?

        假如消費(fèi)者拿到了消息,就一定可以成功處理嗎?

        如果消費(fèi)者從broker拿到一條信息了,但是消息目前還在它的內(nèi)存里,還沒執(zhí)行具體的業(yè)務(wù)邏輯,此時(shí)他就直接提交了這條消息的offset到broker去說自己已經(jīng)處理過了。

        接著消費(fèi)者系統(tǒng)就直接崩潰了,內(nèi)存里的消息就沒了,業(yè)務(wù)邏輯也沒執(zhí)行,結(jié)果Broker已經(jīng)收到他提交的消息offset了,還以為他已經(jīng)處理完這條消息了。

        等消費(fèi)者系統(tǒng)重啟的時(shí)候,就不會再次消費(fèi)這條消息了,因?yàn)橐呀?jīng)提交過offset,broker認(rèn)為你已經(jīng)成功消費(fèi)過這條消息了。

        所以我們在這里,我們要明確一點(diǎn),即使你保證發(fā)送消息到MQ的時(shí)候絕對不會丟失,而且MQ收到消息之后一定不會把消息搞丟失,但是你的消費(fèi)者系統(tǒng)在獲取到消息之后還是可能會搞丟。

        一般RocketMQ的消費(fèi)者中會注冊一個(gè)監(jiān)聽器,當(dāng)你的消費(fèi)者獲取到一批消息之后,就會回調(diào)你的這個(gè)監(jiān)聽器函數(shù),讓你來處理這一批消息。
        consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(          List          ConsumeConcurrentlyContext context) {              // 執(zhí)行業(yè)務(wù)邏輯return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;        }    });
        處理完畢后,才會返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS作為消費(fèi)成功的標(biāo)志,告訴RocketMQ,這批消息我已經(jīng)處理完畢了。

        所以對于RocketMQ而言,只要你的消費(fèi)者系統(tǒng)是在這個(gè)監(jiān)聽器的函數(shù)中先處理一批消息,基于這批消息都執(zhí)行完了業(yè)務(wù)邏輯,然后返回了那個(gè)消費(fèi)成功的狀態(tài),接著才會去提交這批消息的offset到broker去。

        所以在這個(gè)情況下,如果你對一批消息都處理完畢了,然后再提交消息的offset給broker,接著消費(fèi)者系統(tǒng)崩潰了,此時(shí)是不會丟失消息的。

        但是,如果是消費(fèi)者系統(tǒng)獲取到一批消息之后,還沒處理完,也就是還沒返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS這個(gè)狀態(tài),自然沒提交這批消息的offset給broker呢,此時(shí)消費(fèi)者系統(tǒng)突然掛了,會怎么樣?

        在這種情況下,你對一批消息都沒提交他的offset給broker,broker不會認(rèn)為你已經(jīng)處理完了這批消息,此時(shí)你的消費(fèi)者系統(tǒng)的一臺機(jī)器宕機(jī)了,它其實(shí)會感知到你的消費(fèi)者系統(tǒng)的一臺機(jī)器作為一個(gè)Consumer掛了,它會把你沒處理完的那批消息交給生產(chǎn)者系統(tǒng)的其他機(jī)器去進(jìn)行處理,所以在這種情況下,消息也絕對是不會丟失的。

        在默認(rèn)的Consumer的消費(fèi)模式之下,必須是你處理完一批消息了,才會返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS這個(gè)狀態(tài),表示消息都處理結(jié)束了,去提交offset到broker去。在這種情況下,一般來說是不會丟失消息的,即使你一個(gè)Consumer宕機(jī)了,他會把你沒處理完的消息交給其他Consumer去處理。

        但是這里我們要注意一點(diǎn),就是我們不能在代碼中對消息進(jìn)行異步的處理,假如我們開啟了一個(gè)線程去處理這批消息,然后啟動線程之后,就直接返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS狀態(tài)了。
        consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(          List          ConsumeConcurrentlyContext context) {              new Thread() {                  public void run() {                      // 執(zhí)行業(yè)務(wù)邏輯                              }              }.start();return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;        }    });
        如果要是用這種方式來處理消息的話,那可能就會出現(xiàn)你開啟的線程還沒處理完消息呢,已經(jīng)返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS狀態(tài)了,就可能提交這批消息的offset給broker了,認(rèn)為已經(jīng)處理結(jié)束了。

        然后此時(shí)你消費(fèi)者系統(tǒng)突然宕機(jī),必然會導(dǎo)致你的消息丟失了!

        因此在使用RocketMQ的場景下,我們?nèi)绻WC消費(fèi)數(shù)據(jù)的時(shí)候別丟消息,你就老老實(shí)實(shí)的在回調(diào)函數(shù)里處理消息,處理完了你再返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS狀態(tài)表明你處理完畢了。

        總結(jié):

        基于RocketMQ設(shè)計(jì)一套全鏈路消息不丟失方案,需要確保生產(chǎn)者、broker、消費(fèi)者三者都不丟失數(shù)據(jù)。

        (1)生產(chǎn)者不丟失消息

        方案1:同步發(fā)送消息 +?失敗重試;
        方案2:事務(wù)消息機(jī)制;

        (2)broker不丟失消息,開啟同步刷盤策略 + 主從架構(gòu)同步機(jī)制。

        只要讓一個(gè)master Broker收到消息之后同步寫入磁盤,同時(shí)同步復(fù)制給其他slave Broker,再返回成功響應(yīng)給生產(chǎn)者,此時(shí)就可以保證MQ自己不會弄丟消息

        (3)消費(fèi)者不丟失消息,采用RocketMQ的消費(fèi)者天然就可以保證你處理完消息之后,才會提交消息的offset到broker去,不過別采用多線程異步處理消息的方式。

        雖然這一整套消息不丟失方案,可以確保消息流轉(zhuǎn)過程中不丟失。但顯而易見的是,你用了這套方案之后,會讓你整個(gè)從頭到尾的消息流轉(zhuǎn)鏈路的性能大幅度下降,讓你的MQ的吞吐量大幅度的下降。

        所以一般大家不要隨便一個(gè)業(yè)務(wù)里就上如此重的一套方案,要明白這背后的成本!

        一般我們建議,對于跟金錢、交易以及核心數(shù)據(jù)相關(guān)的系統(tǒng)和核心鏈路,可以上這套消息零丟失方案。

        而對于其他大部分沒那么核心的場景和系統(tǒng),其實(shí)即使丟失一些數(shù)據(jù),也不會導(dǎo)致太大的問題,此時(shí)可以不采取這些方案,或者說你可以在其他的場景里做一些簡化。

        有道無術(shù),術(shù)可成;有術(shù)無道,止于術(shù)

        歡迎大家關(guān)注Java之道公眾號


        好文章,我在看??

        瀏覽 50
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評論
        圖片
        表情
        推薦
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            美女被操视频在线观看 | 国产精品久久久久久久久久清纯 | 加勒比无码视频 | 把腿扒开我添30分钟视频 | 红桃视频乱伦 | 我把女的日出白液视频 | 亚洲在线 | 男女拍拍视频 j.zyme.xin | 午夜久久久 | 女人毛片18 |