1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        如何做到RabbitMQ全鏈路數(shù)據(jù)100%不丟失

        共 6822字,需瀏覽 14分鐘

         ·

        2021-11-13 15:45

        點(diǎn)擊上方 Java學(xué)習(xí)之道,選擇 設(shè)為星標(biāo)

        每天18:30點(diǎn),干貨準(zhǔn)時(shí)奉上!

        來源: blog.csdn.net/hsz2568952354/article/details/86559470
        作者: 指尖涼

        我們都知道,消息從生產(chǎn)端到消費(fèi)端消費(fèi)要經(jīng)過3個(gè)步驟:

        1. 生產(chǎn)端發(fā)送消息到RabbitMQ;
        2. RabbitMQ發(fā)送消息到消費(fèi)端;
        3. 消費(fèi)端消費(fèi)這條消息;

        這3個(gè)步驟中的每一步都有可能導(dǎo)致消息丟失,消息丟失不可怕,可怕的是丟失了我們還不知道,所以要有一些措施來保證系統(tǒng)的可靠性。這里的可靠并不是一定就100%不丟失了,磁盤損壞,機(jī)房爆炸等等都能導(dǎo)致數(shù)據(jù)丟失,當(dāng)然這種都是極小概率發(fā)生,能做到99.999999%消息不丟失,就是可靠的了。下面來具體分析一下問題以及解決方案。

        生產(chǎn)端可靠性投遞

        生產(chǎn)端可靠性投遞,即生產(chǎn)端要確保將消息正確投遞到RabbitMQ中。生產(chǎn)端投遞的消息丟失的原因有很多,比如消息在網(wǎng)絡(luò)傳輸?shù)倪^程中發(fā)生網(wǎng)絡(luò)故障消息丟失,或者消息投遞到RabbitMQ時(shí)RabbitMQ掛了,那消息也可能丟失,而我們根本不知道發(fā)生了什么。針對(duì)以上情況,RabbitMQ本身提供了一些機(jī)制。

        事務(wù)消息機(jī)制

        事務(wù)消息機(jī)制由于會(huì)嚴(yán)重降低性能,所以一般不采用這種方法,我就不介紹了,而采用另一種輕量級(jí)的解決方案——confirm消息確認(rèn)機(jī)制。

        confirm消息確認(rèn)機(jī)制

        什么是confirm消息確認(rèn)機(jī)制?顧名思義,就是生產(chǎn)端投遞的消息一旦投遞到RabbitMQ后,RabbitMQ就會(huì)發(fā)送一個(gè)確認(rèn)消息給生產(chǎn)端,讓生產(chǎn)端知道我已經(jīng)收到消息了,否則這條消息就可能已經(jīng)丟失了,需要生產(chǎn)端重新發(fā)送消息了。

        通過下面這句代碼來開啟確認(rèn)模式:

        channel.confirmSelect();// 開啟發(fā)送方確認(rèn)模式

        然后異步監(jiān)聽確認(rèn)和未確認(rèn)的消息:

        channel.addConfirmListener(new ConfirmListener() {
            //消息正確到達(dá)broker
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("已收到消息");
                //做一些其他處理
            }

            //RabbitMQ因?yàn)樽陨韮?nèi)部錯(cuò)誤導(dǎo)致消息丟失,就會(huì)發(fā)送一條nack消息
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("未確認(rèn)消息,標(biāo)識(shí):" + deliveryTag);
                //做一些其他處理,比如消息重發(fā)等
            }
        });

        這樣就可以讓生產(chǎn)端感知到消息是否投遞到RabbitMQ中了,當(dāng)然這樣還不夠,稍后我會(huì)說一下極端情況。

        消息持久化

        那消息持久化呢?我們知道,RabbitMQ收到消息后將這個(gè)消息暫時(shí)存在了內(nèi)存中,那這就會(huì)有個(gè)問題,如果RabbitMQ掛了,那重啟后數(shù)據(jù)就丟失了,所以相關(guān)的數(shù)據(jù)應(yīng)該持久化到硬盤中,這樣就算RabbitMQ重啟后也可以到硬盤中取數(shù)據(jù)恢復(fù)。那如何持久化呢?

        message消息到達(dá)RabbitMQ后先是到exchange交換機(jī)中,然后路由給queue隊(duì)列,最后發(fā)送給消費(fèi)端。

        所有需要給exchange、queue和message都進(jìn)行持久化:

        exchange持久化:

        //第三個(gè)參數(shù)true表示這個(gè)exchange持久化
        channel.exchangeDeclare(EXCHANGE_NAME, "direct"true);

        queue持久化:

        //第二個(gè)參數(shù)true表示這個(gè)queue持久化
        channel.queueDeclare(QUEUE_NAME, truefalsefalsenull);

        message持久化:

        //第三個(gè)參數(shù)MessageProperties.PERSISTENT_TEXT_PLAIN表示這條消息持久化
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));

        這樣,如果RabbitMQ收到消息后掛了,重啟后會(huì)自行恢復(fù)消息。

        到此,RabbitMQ提供的幾種機(jī)制都介紹完了,但這樣還不足以保證消息可靠性投遞RabbitMQ中,上面我也提到了會(huì)有極端情況,比如RabbitMQ收到消息還沒來得及將消息持久化到硬盤時(shí),RabbitMQ掛了,這樣消息還是丟失了,或者RabbitMQ在發(fā)送確認(rèn)消息給生產(chǎn)端的過程中,由于網(wǎng)絡(luò)故障而導(dǎo)致生產(chǎn)端沒有收到確認(rèn)消息,這樣生產(chǎn)端就不知道RabbitMQ到底有沒有收到消息,就不好做接下來的處理。

        所以除了RabbitMQ提供的一些機(jī)制外,我們自己也要做一些消息補(bǔ)償機(jī)制,以應(yīng)對(duì)一些極端情況。接下來我就介紹其中的一種解決方案——消息入庫。

        消息入庫

        消息入庫,顧名思義就是將要發(fā)送的消息保存到數(shù)據(jù)庫中。

        首先發(fā)送消息前先將消息保存到數(shù)據(jù)庫中,有一個(gè)狀態(tài)字段status=0,表示生產(chǎn)端將消息發(fā)送給了RabbitMQ但還沒收到確認(rèn);在生產(chǎn)端收到確認(rèn)后將status設(shè)為1,表示RabbitMQ已收到消息。這里有可能會(huì)出現(xiàn)上面說的兩種情況,所以生產(chǎn)端這邊開一個(gè)定時(shí)器,定時(shí)檢索消息表,將status=0并且超過固定時(shí)間后(可能消息剛發(fā)出去還沒來得及確認(rèn)這邊定時(shí)器剛好檢索到這條status=0的消息,所以給個(gè)時(shí)間)還沒收到確認(rèn)的消息取出重發(fā)(第二種情況下這里會(huì)造成消息重復(fù),消費(fèi)者端要做冪等性),可能重發(fā)還會(huì)失敗,所以可以做一個(gè)最大重發(fā)次數(shù),超過就做另外的處理。

        這樣消息就可以可靠性投遞到RabbitMQ中了,而生產(chǎn)端也可以感知到了。

        消費(fèi)端消息不丟失

        既然已經(jīng)可以讓生產(chǎn)端100%可靠性投遞到RabbitMQ了,那接下來就改看看消費(fèi)端的了,如何讓消費(fèi)端不丟失消息。

        默認(rèn)情況下,以下3種情況會(huì)導(dǎo)致消息丟失:

        • 在RabbitMQ將消息發(fā)出后,消費(fèi)端還沒接收到消息之前,發(fā)生網(wǎng)絡(luò)故障,消費(fèi)端與RabbitMQ斷開連接,此時(shí)消息會(huì)丟失;
        • 在RabbitMQ將消息發(fā)出后,消費(fèi)端還沒接收到消息之前,消費(fèi)端掛了,此時(shí)消息會(huì)丟失;
        • 消費(fèi)端正確接收到消息,但在處理消息的過程中發(fā)生異?;蝈礄C(jī)了,消息也會(huì)丟失。

        其實(shí),上述3中情況導(dǎo)致消息丟失歸根結(jié)底是因?yàn)镽abbitMQ的自動(dòng)ack機(jī)制,即默認(rèn)RabbitMQ在消息發(fā)出后就立即將這條消息刪除,而不管消費(fèi)端是否接收到,是否處理完,導(dǎo)致消費(fèi)端消息丟失時(shí)RabbitMQ自己又沒有這條消息了。

        所以就需要將自動(dòng)ack機(jī)制改為手動(dòng)ack機(jī)制。

        消費(fèi)端手動(dòng)確認(rèn)消息:

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            try {
                //接收到消息,做處理
                //手動(dòng)確認(rèn)
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            } catch (Exception e) {
                //出錯(cuò)處理,這里可以讓消息重回隊(duì)列重新發(fā)送或直接丟棄消息
            }
        };
        //第二個(gè)參數(shù)autoAck設(shè)為false表示關(guān)閉自動(dòng)確認(rèn)機(jī)制,需手動(dòng)確認(rèn)
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});

        這樣,當(dāng)autoAck參數(shù)置為false,對(duì)于RabbitMQ服務(wù)端而言,隊(duì)列中的消息分成了兩個(gè)部分:一部分是等待投遞給消費(fèi)端的消息;一部分是已經(jīng)投遞給消費(fèi)端,但是還沒有收到消費(fèi)端確認(rèn)信號(hào)的消息。如果RabbitMQ一直沒有收到消費(fèi)端的確認(rèn)信號(hào),并且消費(fèi)此消息的消費(fèi)端已經(jīng)斷開連接或宕機(jī)(RabbitMQ會(huì)自己感知到),則RabbitMQ會(huì)安排該消息重新進(jìn)入隊(duì)列(放在隊(duì)列頭部),等待投遞給下一個(gè)消費(fèi)者,當(dāng)然也有能還是原來的那個(gè)消費(fèi)端,當(dāng)然消費(fèi)端也需要確保冪等性。

        好了,到此從生產(chǎn)端到RabbitMQ再到消費(fèi)端的全鏈路,就可以保證數(shù)據(jù)的不丟失。

        由于個(gè)人水平有限,有些地方可能理解錯(cuò)了或理解不到位的,請(qǐng)大家多多指出!Thanks

        -- END --

         | 更多精彩文章 -



           
                  
        加我微信,交個(gè)朋友
                 
        長按/掃碼添加↑↑↑
                        

        瀏覽 41
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            国产午夜福利视频在线观看| 欧美黑吊大战白妞欧美大片| 人人射视频| 中文字幕无码日韩| 一本色综合亚洲精品| 人人摸人人操人人射| 国产成人av在线观看| 国产美女福利| 国产精品美女在线观看| 亚洲免费AV在线| 亚洲欧美在线视频免费| 无码人妻精品一区二区蜜桃网站| 婷婷日逼| 激情开心站| 山东乱子伦视频国产| www.seses| 亚洲精品鲁一鲁一区二区三区| 日本三级久久| 超碰在线观看2407| 色吟av| 婷婷久草网| 欧洲毛片基地c区| 欧美欧美欧美| 久久电影无码| 成人精品A片免费网站| JlZZJLZZJlZZ亚洲女人17 | 91久久超碰| 狠狠2021| 中文字幕免费无码| 国产免费av在线| 在桌下含她的花蒂和舌头H视频| 操逼网站在线看| 国产乱码精品一品二品| 亚洲日韩欧美色图| 丝袜三级片| 少妇人妻av| 亚洲AV成人无码一区二区三区| 美女做爱视频网站| 国产精品宾馆在线| 国产精彩无码视频| 久久肉| 国产精品成人免费视频| 操逼网首页123| 色五月婷婷基地| 青草视频在线免费观看| 伊人大香蕉电影| 熟女久久| a在线观看视频| 成人激情五月天| 99视频热| 久久18| 欧美三P囗交做爰| 婷婷五月丁香六月| 九九热精品| 久久久久久久久久国产精品免费观看-百度 | 三级久久| 午夜艹| 少妇高潮av久久久久久| 国产无遮挡又黄又爽在线观看| 黄色成人视频网站在线观看| 北岛玲视频在线| 久久亚洲AV无码午夜麻豆| 日本黄色大片网站| 黄色大片在线播放| 亚洲天堂婷婷| 日韩无码免费电影| 人人鲁人人操| 亚州天堂| 性爱无码网站| 中文精品字幕人妻熟女| 成年人黄色网址| 99香蕉视频| 无码视频播放| 特黄AAAAAAAA片视频| 三级片在线观看网站| 欧美日韩AV| 大学生18一19GAY169| 91av在线观看视频| 免费黄色成人| 成人视频18+在线观看| 大香蕉三级| 北条麻妃无码一区二区| 无码国产视频| 俺也去av| 91狠狠综合久久| 人人人操| 国产美女一级特黄大片| 亚洲影院第一页在线观看| 国产精品久久7777777精品无码 | 内射学生妹J亅| 国产精品7777| 日韩一区二区三区无码电影| 影音先锋久久久| 亚洲爆乳无码一区二区三区| jizz视频| 婷婷五月天在线播放| 亚洲a在线观看| 在线免费黄色视频| 激情五月毛片| 曰韩一级A片| 午夜无码福利在线观看| 一区二区在线免费观看| 黄片视频国产| 久久艹国产| 国产一级a爱做片免费☆观看| 亚洲综合小说| 炮友露脸青楼传媒刘颖儿| 黄色大片av| 九九成人免费视频| 超碰成人欧美| 亚洲二区在线| 手机成人在线视频| 日韩一二三区| 波多野结衣不卡| 成人视频123| 亚洲成人电影AV| 日韩中文字幕视频在线| 精东av| 久久午夜一级A片| 日韩中文字幕人妻| 中文字幕乱| 久热久热| 超碰在线观看2407| 国产思思99re99在线观看| 四川少妇搡bbw搡bbbb| 欧美精品在线观看视频| 黄片免费在线播放| 亚洲福利片| 日本不卡视频| 五月花在线视频| 亚洲天堂三级片| 精品中文字幕在线| 91麻豆精品国产91久久久吃药| 亚洲黄色免费在线观看| 亚洲天堂无码在线| 免费的黄色A片| 日韩干| 大香蕉在线啪啪| 懂色av蜜臀av粉嫩av分享| 久草婷婷| 91在线精品秘一区二区黄瓜| 日韩三级片AV| 免费的黄色视频网站| 五月婷视频| 国产精品国产精品| 一级片| 亚洲中文字幕在线观看视频| 九九九色视频| 视频一区二| 想要xx在线观看| 中文字幕视频在线播放| 国产在线观看免费视频| 日韩视频一二三| 99精品亚洲| 成人网站www污污污网站公司| 在线毛片网站| 男人的天堂黄色| 一本大道香蕉av久久精东影业 | 日本中文字幕无码| 91亚洲精品久久久久蜜桃| 午夜理论在线| 国产成人精品免费看视频| 中文字幕第315页| 欧美在线一区二区三区| 中出欧美亚洲| 大香蕉电影网站| 三p视频| 日本国产精品| 99热视| 久久无码影院| 97久久综合| 微熟女地址导航| 天天草天天射| 国产精品久久AV电影| 95四川乱子伦视频国产| 色播一区| 夜夜嗨AV一区二区三区| 东京热综合| 人人爱人人爽人人操| 免费三级片网址| 国产精品国产精品| 草逼片| 亚洲黄色无码视频| 北京熟妇搡BBBB搡BBBB电影 | 射射AV| 青青草伊人大香蕉| 日本黄色一级| 日本人妻A片成人免费看片| 一区二区成人免费视频| 性BBW| AV在线资源网| 亚洲欧美成人在线| 欧美特黄AAA| 青青草原成人在线视频| 免费无码一区| 国产激情视频在线| 五月天四房播播| 无码AV网| 欧美日韩99| 亚洲无码A片在线观看APP| 亚洲福利网| 国产乱码一区二区三区的区别| 强辱丰满人妻HD中文字幕| 亚洲精品一区二区二区的游戏情况 | 九九九亚洲| 成人网一区二区| 亚洲一区二区久久| 国产AV一二三区| 国产乱子伦-区二区三区| 亚洲A片免费看| 久久偷看各类wc女厕嘘嘘偷窃| 97自拍视频| 91就要爱爱视频| 一级a一级a爰片免费免免中国A片 一级一级a免一级a做免费线看内裤 | 大香蕉国产精品视频| 亚洲在线播放| 电家庭影院午夜| 乱子伦国产精品视频一级毛| 女生自慰网站在线观看| 亚洲精品一区二区二区的游戏情况 | 亚洲精品欧美| 熟女人妻一区二区三区免费看| 五月婷视频| 亚洲AV无码电影| 懂色在线精品分类视频| 操BBB操BBB| 无码操B| 欧美视频一区| 亚洲免费视频网站| 久久久夜夜夜| 爱搞搞就要搞| 亚洲国产免费视频| 91九色91蝌蚪91成人| 欧美成人久久| 国产欧美综合一区| 日韩区在线| 成人精品午夜无码免费| 色色一级| 亚洲自拍中文字幕| 97人人草| 久久电影无码| 婷婷国产综合| 色噜噜在线| 欧美大香蕉伊人网| 久久国产一区| 色色看片| 天天狠天天干| 黄片国产| 欧美视频精品| 天天操天天干麻豆| 成人午夜精品无码区| 久久无码一区| 黄色视频免费观看| 无码精品成人观看A片| 久久亚洲天堂| 亚洲内射网| 亚洲AV无码精品久久一区二区| 久久综合中文字幕| 亚洲欧美日韩在线| 免费欧美性爱| 91久久欧美极品XXXXⅩ| 婷婷少妇激情| 亚洲区欧美区| 黄片av| 国产伦精品一级A片视频夜夜| 人人爽人人| gogogo高清在线观看免费直播中国 | 久久99视频免费观看| 怡红院欧美| 国产特級黃色大片| 日本一级黄色| 美女综合网| 久久久久久成人无码| 99久久夜色精品国产亚洲| 免费a片在线观看| 精品国产免费无码久久噜噜噜AV| 亚洲精品久久久久avwww潮水 | 性无码专区| 免费高清无码视频在线观看| 成年人免费视频网站| 国产亚洲久一区二区| 欧美v亚洲| 91色婷婷综合久久中文字幕二区 | 国产操屄视频| 操逼黄视频| 亚洲视频免费在线| 大香蕉大香蕉大香蕉| 精品亚洲一区二区三区四区五区| 亚洲成人三级片| 国内精品人妻无码久久久影院蜜桃| 国产黄色一级| 婷婷色网| 无码国产精品一区二区视频| 在线中文av| 大香蕉操B| 美腿丝袜中文字幕精品| 91视频网| 3p视频网站| 一区二区三区电影高清电影免费观看 | 国产黄色网页|