1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        面試官:消息隊(duì)列消息可靠性、重復(fù)消息、消息積壓、利用消息實(shí)現(xiàn)分布式事務(wù)

        共 7757字,需瀏覽 16分鐘

         ·

        2021-12-23 05:25


        -? ? ?如何確保消息不丟失? ? -?


        1、檢測(cè)消息丟失的方法

        可以利用消息隊(duì)列的有序性來(lái)驗(yàn)證是否有消息丟失。在Producer端給每個(gè)發(fā)出的消息附加一個(gè)連續(xù)遞增的序號(hào),然后在Consumer端來(lái)檢查這個(gè)序號(hào)的連續(xù)性。如果沒(méi)有消息丟失,Consumer收到消息的序號(hào)必然是連續(xù)遞增的,如果檢測(cè)到序號(hào)不連續(xù),那就是丟消息了。還可以通過(guò)缺失的序號(hào)來(lái)確定丟失的是哪條消息,方便進(jìn)一步排查原因。

        大多數(shù)消息隊(duì)列的 客戶端都支持?jǐn)r截器機(jī)制,可以利用這個(gè)攔截器機(jī)制,在Producer發(fā)送消息之前的攔截器中將序號(hào)注入到消息中,在Consumer收到消息的攔截器中檢測(cè)序號(hào)的連續(xù)性。

        如果是在一個(gè)分布式系統(tǒng)中實(shí)現(xiàn)這個(gè)檢測(cè)方法,有幾個(gè)問(wèn)題需要注意:


        首先,像Kafka和RocketMQ這樣的消息隊(duì)列,是不保證Topic上的嚴(yán)格順序的,只能保證分區(qū)上的消息是有序的,所以在發(fā)消息的時(shí)候必須指定分區(qū),并且,在每個(gè)分區(qū)單獨(dú)檢測(cè)消息序號(hào)的連續(xù)性。

        如果系統(tǒng)中Producer是多實(shí)例的,由于并不好協(xié)調(diào)多個(gè)Producer之間的發(fā)送順序,所以也需要每個(gè)Producer分別生成各自的消息序號(hào),并且需要附加上Producer的標(biāo)識(shí),在Consumer端按照每個(gè)Producer分別來(lái)檢測(cè)序號(hào)的連續(xù)性Consumer實(shí)例的數(shù)量最好和分區(qū)數(shù)量一致,做到Consumer和分區(qū)一一對(duì)應(yīng),這樣會(huì)比較方便地在Consumer內(nèi)檢測(cè)消息序號(hào)的連續(xù)性。

        2、確保消息可靠傳遞


        一條消息從生產(chǎn)到消費(fèi)完成這個(gè)過(guò)程,可以劃分為三個(gè)階段:


        (1)生產(chǎn)階段:在這個(gè)階段,從消息在Producer創(chuàng)建出來(lái),經(jīng)過(guò)網(wǎng)絡(luò)傳輸發(fā)送到Broker端;
        (2)存儲(chǔ)階段:在這個(gè)階段,消息在Broker端存儲(chǔ),如果是集群,消息會(huì)在這個(gè)階段被復(fù)制到其他的副本上;
        (3)消費(fèi)階段:在這個(gè)階段,Consumer從Broker上拉取消息,經(jīng)過(guò)網(wǎng)絡(luò)傳輸發(fā)送到Consumer上。

        2.1、生產(chǎn)階段

        在生產(chǎn)階段,消息隊(duì)列通過(guò)最常用的請(qǐng)求確認(rèn)機(jī)制,來(lái)保證消息的可靠傳遞:當(dāng)在代碼中調(diào)用發(fā)送消息方法時(shí),消息隊(duì)列的客戶端會(huì)把消息發(fā)送到Broker,Broker收到消息后,會(huì)給客戶端返回一個(gè)確認(rèn)響應(yīng),表明消息已經(jīng)收到了??蛻舳耸盏巾憫?yīng)后,完成了一次正常消息的發(fā)送。

        只要Producer收到了Broker的確認(rèn)響應(yīng)就可以保證消息在生產(chǎn)階段不會(huì)丟失。有些消息隊(duì)列在長(zhǎng)時(shí)間沒(méi)收到發(fā)送確認(rèn)響應(yīng)后,會(huì)自動(dòng)重試,如果重試再失敗,就會(huì)以返回值或者異常的方式告知用戶。

        在編寫(xiě)發(fā)送消息代碼時(shí),需要注意,正確處理返回值或者捕獲異常,就可以保證這個(gè)階段的消息不會(huì)丟失。

        以 Kafka 為例:

        同步發(fā)送時(shí),只要注意捕獲異常即可。

        1. try{

        2. producer.send(record).get();

        3. System.out.println("消息發(fā)送成功");

        4. } catch(Exception e) {

        5. System.out.println("消息發(fā)送失敗");

        6. System.out.println(e);

        7. }



        異步發(fā)送時(shí),則需要在回調(diào)方法里進(jìn)行檢查:


        1. producer.send(record, newCallback() {

        2. @Override

        3. publicvoid onCompletion(RecordMetadata metadata, Exception exception) {

        4. if(metadata != null) {

        5. System.out.println("消息發(fā)送成功");

        6. } else{

        7. System.out.println("消息發(fā)送失敗");

        8. System.out.println(exception);

        9. }

        10. }

        11. });

        1. producer.send(record, (metadata, exception) -> {

        2. if(metadata != null) {

        3. System.out.println("消息發(fā)送成功");

        4. } else{

        5. System.out.println("消息發(fā)送失敗");

        6. System.out.println(exception);

        7. }

        8. });




        2.2、存儲(chǔ)階段

        在存儲(chǔ)階段正常情況下,只要Broker在正常運(yùn)行,就不會(huì)出現(xiàn)丟失消息的問(wèn)題,但是如果Broker出現(xiàn)了故障,比如進(jìn)程死掉了或者服務(wù)器宕機(jī)了,還是可能會(huì)丟失消息的。

        如果對(duì)消息的可靠性要求非常高,可以通過(guò)配置Broker參數(shù)來(lái)避免因?yàn)殄礄C(jī)丟消息。

        對(duì)于單個(gè)節(jié)點(diǎn)的Broker,需要配置Broker參數(shù),在收到消息后,將消息寫(xiě)入磁盤(pán)后再給Producer返回確認(rèn)響應(yīng),這樣即使發(fā)生宕機(jī),由于消息已經(jīng)被寫(xiě)入磁盤(pán),就不會(huì)丟失消息,恢復(fù)后還可以繼續(xù)消費(fèi)。例如,在RocketMQ中,需要將刷盤(pán)方式flushDiskType配置為SYNC_FLUSH同步刷盤(pán)。

        如果Broker是由多個(gè)節(jié)點(diǎn)組成的集群,需要將Broker集群配置成:至少將消息發(fā)送到2個(gè)以上的節(jié)點(diǎn),再給客戶端回復(fù)發(fā)送確認(rèn)響應(yīng)。這樣當(dāng)某個(gè)Broker宕機(jī)后,其他的Broker可以替代宕機(jī)的Broker,也不會(huì)發(fā)生消息丟失。

        2.3、消費(fèi)階段

        消費(fèi)階段采用和生產(chǎn)階段類似的確認(rèn)機(jī)制來(lái)保證消息的可靠傳遞,客戶端從Broker拉取消息后,執(zhí)行用戶的消費(fèi)業(yè)務(wù)邏輯。

        成功后,才會(huì)給Broker發(fā)送消費(fèi)確認(rèn)響應(yīng)。如果Broker沒(méi)有收到消費(fèi)確認(rèn)響應(yīng),下次拉消息的時(shí)候還會(huì)返回同一條消息,確認(rèn)消息不會(huì)在網(wǎng)絡(luò)傳輸過(guò)程中丟失,也不會(huì)因?yàn)榭蛻舳嗽趫?zhí)行消費(fèi)邏輯中出錯(cuò)導(dǎo)致丟失。

        在編寫(xiě)消費(fèi)代碼時(shí)需要注意的是,不要在收到消息后就立即發(fā)送消費(fèi)確認(rèn),而是應(yīng)該在執(zhí)行完所有消費(fèi)業(yè)務(wù)邏輯之后,再發(fā)送消費(fèi)確認(rèn)。

        以 SpringBoot 整合 RabbitMQ 為例:



        -? ? ?小結(jié)? ? -?


        在生產(chǎn)階段,需要捕獲消息發(fā)送的錯(cuò)誤,并重發(fā)消息2、在存儲(chǔ)階段,可以通過(guò)配置刷盤(pán)和復(fù)制相關(guān)的參數(shù),讓消息寫(xiě)入到多個(gè)副本的磁盤(pán)上,來(lái)確保消息不會(huì)因?yàn)槟硞€(gè)Broker宕機(jī)或者磁盤(pán)損壞而丟失3、在消費(fèi)階段,需要在處理完全部消費(fèi)業(yè)務(wù)邏輯之后,再發(fā)送消費(fèi)確認(rèn)。


        -? ? ?如何處理消費(fèi)過(guò)程中的重復(fù)消息? ? -?


        1、消息重復(fù)的情況必然存在


        在MQTT協(xié)議中,給出了三種傳遞消息時(shí)能夠提供的服務(wù)質(zhì)量標(biāo)準(zhǔn),這三種服務(wù)質(zhì)量從低到高依次是:

        At most once:至多一次。消息在傳遞時(shí),最多會(huì)被送達(dá)一次。也就是說(shuō),沒(méi)什么消息可靠性保證,允許丟消息。一般都是一些對(duì)消息可靠性要求不太高的監(jiān)控場(chǎng)景使用,比如每分鐘上報(bào)一次機(jī)房溫度數(shù)據(jù),可以接受數(shù)據(jù)少量丟失。

        At least once:至少一次。消息在傳遞時(shí),至少會(huì)被送達(dá)一次。也就是說(shuō),不允許丟消息,但是允許有少量重復(fù)消息出現(xiàn)。關(guān)注 我是程序

        Exactly once:恰好一次。消息在傳遞時(shí),只會(huì)被送達(dá)一次,不允許丟失也不允許重復(fù),這個(gè)是最高的等級(jí) 這個(gè)服務(wù)質(zhì)量標(biāo)準(zhǔn)不僅適用于MQTT,對(duì)所有的消息隊(duì)列都是適用的?,F(xiàn)在常用的絕大部分消息隊(duì)列提供的服務(wù)質(zhì)量都是 At east once,包括RocketMQ、RabbitMQ和Kafka都是這樣。也就是說(shuō),消息隊(duì)列很難保證消息不重復(fù)。

        2、用冪等性解決重復(fù)消息問(wèn)題


        一般解決重復(fù)消息的辦法是,在消費(fèi)端,讓我們消費(fèi)消息的操作具備冪等性。一個(gè)冪等操作的特點(diǎn)是,其任意多次執(zhí)行所產(chǎn)生的影響均與一次執(zhí)行的影響相同。一個(gè)冪等的方法使用同樣的參數(shù),對(duì)它進(jìn)行多次調(diào)用和一次調(diào)用,對(duì)系統(tǒng)產(chǎn)生的影響是一樣的。所以,對(duì)于冪等的方法,不用擔(dān)心重復(fù)執(zhí)行會(huì)對(duì)系統(tǒng)造成任何改變。

        從對(duì)系統(tǒng)的影響結(jié)果來(lái)說(shuō):At least once+冪等消費(fèi)=Exactly once

        幾種常用的設(shè)計(jì)冪等操作的方法


        1、利用數(shù)據(jù)庫(kù)的唯一約束實(shí)現(xiàn)冪等
        舉個(gè)例子:將賬戶X的余額加100元。可以通過(guò)改造業(yè)務(wù)邏輯,讓它具備冪等性
        首先,可以限定對(duì)于每個(gè)轉(zhuǎn)賬單每個(gè)賬戶只可以執(zhí)行一次變更操作,最簡(jiǎn)單的是在數(shù)據(jù)庫(kù)建一張轉(zhuǎn)賬流水表,這個(gè)表有三個(gè)字段:轉(zhuǎn)賬單ID、賬戶ID和變更金額,然后給轉(zhuǎn)賬單ID和賬戶ID這兩個(gè)字段聯(lián)合起來(lái)創(chuàng)建一個(gè)唯一約束,這樣對(duì)于相同的轉(zhuǎn)賬單ID和賬戶ID,表里至多只能存在一條記錄。

        這樣,消費(fèi)消息的邏輯可以變?yōu)椋涸谵D(zhuǎn)賬流水表中增加一條轉(zhuǎn)賬記錄,然后再根據(jù)轉(zhuǎn)賬記錄,異步操作更新用戶余額即可。在轉(zhuǎn)賬流水表增加一條轉(zhuǎn)賬記錄這個(gè)操作中,由于在這個(gè)表中預(yù)先定義了賬戶ID轉(zhuǎn)賬單ID的唯一索引,對(duì)于同一個(gè)轉(zhuǎn)賬單同一個(gè)賬戶只能插入一條記錄,后續(xù)重復(fù)的插入操作都會(huì)失敗,這樣就實(shí)現(xiàn)了一個(gè)冪等的操作。

        只要是支持類似INSERT IF NOT EXIST語(yǔ)義的存儲(chǔ)類系統(tǒng)都可以用于實(shí)現(xiàn)冪等,比如,可以用Redis的SETNX命令來(lái)替代數(shù)據(jù)庫(kù)中的唯一約束,來(lái)實(shí)現(xiàn)冪等消費(fèi)。

        2、為更新的數(shù)據(jù)設(shè)置前置條件
        另外一種實(shí)現(xiàn)冪等的思路是,給數(shù)據(jù)變更設(shè)置一個(gè)前置條件,如果滿足條件就更新數(shù)據(jù),否則拒絕更新數(shù)據(jù),在更新數(shù)據(jù)的時(shí)候,同時(shí)變更前置條件中需要判斷的數(shù)據(jù)。這樣,重復(fù)執(zhí)行這個(gè)操作時(shí),由于第一次更新數(shù)據(jù)的時(shí)候已經(jīng)變更了前置條件中需要判斷的數(shù)據(jù),不滿足前置條件,則不會(huì)重復(fù)執(zhí)行更新數(shù)據(jù)操作。

        比如,將賬戶X的余額增加100元這個(gè)操作并不滿足冪等性,可以把這個(gè)操作加上一個(gè)前置條件,變?yōu)椋喝绻~戶X當(dāng)前的月為500元,將余額加100元,這個(gè)操作就具備了冪等性。對(duì)應(yīng)到消息隊(duì)列中的使用時(shí),可以在發(fā)消息時(shí)在消息體中帶上當(dāng)前的余額,在消費(fèi)的時(shí)候判斷數(shù)據(jù)庫(kù)中當(dāng)前余額是否與消息中的余額相等,只有相等才執(zhí)行變更操作。

        更加通用的方法是,給數(shù)據(jù)增加一個(gè)版本號(hào)屬性,每次更新數(shù)據(jù)前,比較當(dāng)前數(shù)據(jù)的版本號(hào)是否和消息中的版本號(hào)一直,如果不一致就拒絕更新數(shù)據(jù),更新數(shù)據(jù)的同時(shí)將版本號(hào)+1,一樣可以實(shí)現(xiàn)冪等更新。

        3、記錄并檢查操作
        還有一種通用性最強(qiáng)的實(shí)現(xiàn)冪等性方法:記錄并檢查操作,也稱為T(mén)oken機(jī)制或者GUID(全局唯一ID)機(jī)制,實(shí)現(xiàn)思路:在執(zhí)行數(shù)據(jù)更新操作之前,先檢查一下是否執(zhí)行過(guò)這個(gè)更新操作。

        具體的實(shí)現(xiàn)方法是,在發(fā)送消息時(shí),給每條消息指定一個(gè)全局唯一的ID,消費(fèi)時(shí),先根據(jù)這個(gè)ID檢查這條消息是否有被消費(fèi)過(guò),如果沒(méi)有消費(fèi)過(guò),才更新數(shù)據(jù),然后將消費(fèi)狀態(tài)置為已消費(fèi)。

        但在分布式系統(tǒng)中,這個(gè)方法非常難以實(shí)現(xiàn)。首先,給每個(gè)消息指定一個(gè)全局唯一的ID就是一件不那么簡(jiǎn)單的事情,方法有很多,但都不太好同時(shí)滿足簡(jiǎn)單、高可用和高性能,或多或少都要有些犧牲。更加麻煩的是,檢查消費(fèi)狀態(tài),然后更新數(shù)據(jù)并且設(shè)置消費(fèi)狀態(tài)這三個(gè)操作必須作為一組操作保證原子性,才能真正實(shí)現(xiàn)冪等,否則就會(huì)出現(xiàn)Bug。


        -? ? ?如何處理消息積壓?? ? -?


        消息積壓的直接原因一定是系統(tǒng)中的某個(gè)部分出現(xiàn)了性能問(wèn)題,來(lái)不及處理上游發(fā)送的消息,才會(huì)導(dǎo)致消息積壓。

        優(yōu)化性能來(lái)避免消息積壓


        1、發(fā)送端性能優(yōu)化
        對(duì)于發(fā)送消息的業(yè)務(wù)邏輯,只需要設(shè)置合適的并發(fā)和批量大小,就可以達(dá)到很多的發(fā)送性能。

        Producer發(fā)送消息的過(guò)程包括:Producer發(fā)送消息給Broker,Broker收到消息返回確認(rèn)響應(yīng)。假設(shè)這一次交互的平均時(shí)延是1ms,這1ms包括了下面這些步驟的耗時(shí):
        • 發(fā)送端準(zhǔn)備數(shù)據(jù)、序列化消息、構(gòu)造請(qǐng)求等邏輯的時(shí)間,也就是發(fā)送端在網(wǎng)絡(luò)請(qǐng)求之前的耗時(shí);

        • 發(fā)送消息和返回響應(yīng)在網(wǎng)絡(luò)傳輸中的耗時(shí);

        • Broker處理消息的時(shí)延。


        如果是單線程發(fā)送,每次只發(fā)送1條消息,那么每秒只能發(fā)送1000ms/1ms*1條/ms=1000條消息。無(wú)論是增加每次發(fā)送消息的批量大小,還是增加并發(fā)都能成倍地提升發(fā)送性能。關(guān)注 我是程序

        比如說(shuō),消息發(fā)送端主要接收RPC請(qǐng)求處理在線業(yè)務(wù),因?yàn)樗蠷PC框架都是多線程支持多并發(fā)的,自然就實(shí)現(xiàn)了并行發(fā)送消息。并且在線業(yè)務(wù)比較在意的是請(qǐng)求響應(yīng)時(shí)延,選擇批量發(fā)送會(huì)影響RPC服務(wù)的時(shí)延。

        如果是一個(gè)離線系統(tǒng),它在性能上更注重整個(gè)系統(tǒng)的吞吐量,發(fā)送端的數(shù)據(jù)都是來(lái)自于數(shù)據(jù)庫(kù),這種情況就更適合批量發(fā)送??梢耘繌臄?shù)據(jù)庫(kù)讀取數(shù)據(jù),然后批量來(lái)發(fā)送消息,同樣用少量的并發(fā)就可以獲得非常高的吞吐量。

        2、消費(fèi)端性能優(yōu)化
        使用消息隊(duì)列的時(shí)候,大部分的性能問(wèn)題都出現(xiàn)在消費(fèi)端,如果消費(fèi)的速度跟不上發(fā)送生產(chǎn)消息的速度,就會(huì)造成消息積壓。如果這種性能倒掛的問(wèn)題只是暫時(shí)的,只要消費(fèi)單的性能恢復(fù)之后,超過(guò)發(fā)送端的性能,那積壓的消息是可以逐漸被消化掉的。

        要是消費(fèi)速度一直比生產(chǎn)速度慢,時(shí)間長(zhǎng)了,整個(gè)系統(tǒng)就會(huì)出現(xiàn)問(wèn)題,要么,消息隊(duì)列的存儲(chǔ)被填滿無(wú)法提供服務(wù),要么消息丟失,這對(duì)于整個(gè)系統(tǒng)來(lái)說(shuō)都是嚴(yán)重故障。

        在設(shè)計(jì)系統(tǒng)的時(shí)候,一定要保證消費(fèi)端的消費(fèi)性能要高于生產(chǎn)端的發(fā)送性能。

        消費(fèi)端的性能優(yōu)化除了優(yōu)化消費(fèi)業(yè)務(wù)邏輯之外,也可以通過(guò)水平擴(kuò)容,增加消費(fèi)端的并發(fā)數(shù)來(lái)提升總體的消費(fèi)性能。在擴(kuò)容Consumer的實(shí)例數(shù)量的同時(shí),必須同步擴(kuò)容主題中的分區(qū)數(shù)量,確保Consumer的實(shí)例數(shù)和分區(qū)數(shù)量是相等的。如果Consumer的實(shí)例數(shù)量超過(guò)分區(qū)數(shù)量,這樣的擴(kuò)容是無(wú)效的。關(guān)注 我是程序

        消息積壓了該如何處理?


        還有一種消息積壓的情況是,日常系統(tǒng)正常運(yùn)轉(zhuǎn)的時(shí)候,沒(méi)有積壓或者只有少量積壓很快就消費(fèi)掉了,但是某一時(shí)刻,突然就開(kāi)始積壓消息并且積壓持續(xù)上漲。這種情況下需要在短時(shí)間內(nèi)找到消息積壓的原因,迅速解決問(wèn)題。

        能導(dǎo)致積壓突然增加,最粗粒度的原因,只有兩種:要么是發(fā)送變快了,要么是消費(fèi)變慢了。

        大部分消息隊(duì)列都內(nèi)置了監(jiān)控的功能,只要通過(guò)監(jiān)控?cái)?shù)據(jù),很容易確定是哪種原因。如果是單位事件發(fā)送的消息增多,比如說(shuō)是趕上大促或者搶購(gòu),短時(shí)間內(nèi)不太可能優(yōu)化消費(fèi)端的代碼來(lái)提升消費(fèi)性能,唯一的方法是通過(guò)擴(kuò)容消費(fèi)端的實(shí)例來(lái)提升總體的消費(fèi)能力。

        如果短時(shí)間內(nèi)沒(méi)有足夠的服務(wù)器資源進(jìn)行擴(kuò)容,沒(méi)辦法的辦法是將系統(tǒng)降級(jí),通過(guò)關(guān)閉一些不重要的業(yè)務(wù),減少發(fā)送方發(fā)送的數(shù)據(jù)量,最低限度讓系統(tǒng)還能正常運(yùn)轉(zhuǎn),服務(wù)一些重要業(yè)務(wù)。

        還有一種不太常見(jiàn)的情況,通過(guò)監(jiān)控發(fā)現(xiàn),無(wú)論是發(fā)送消息的速度還是消費(fèi)消息的速度和原來(lái)都沒(méi)什么變化,這時(shí)候需要檢查一下消費(fèi)端是不是消費(fèi)失敗導(dǎo)致的一條消息發(fā)福消費(fèi)這種情況比較多,這種情況也會(huì)拖垮整個(gè)系統(tǒng)的消費(fèi)速度。


        -? ? ?利用事務(wù)消息實(shí)現(xiàn)分布式事務(wù)? ? -?


        消息隊(duì)列中的事務(wù),主要解決的是:消息生產(chǎn)者和消息消費(fèi)者的數(shù)據(jù)一致性問(wèn)題。

        拿電商來(lái)舉個(gè)例子,一般來(lái)說(shuō),用戶在電商APP上購(gòu)物時(shí),先把商品加到購(gòu)物車?yán)?,然后幾件商品一起下單,最后支付,完成?gòu)物流程,就可以等待收貨了。

        這個(gè)過(guò)程中有一個(gè)需要用到消息隊(duì)列的步驟,訂單系統(tǒng)創(chuàng)建訂單后,發(fā)消息給購(gòu)物車系統(tǒng),將已下單的商品從購(gòu)物車中刪除。因?yàn)閺馁?gòu)物車刪除已下單商品這個(gè)步驟,并不是用戶下單支付這個(gè)主要流程中必需的步驟,使用消息隊(duì)里來(lái)異步清理購(gòu)物車是更加合理的設(shè)計(jì)。


        對(duì)于訂單系統(tǒng)來(lái)說(shuō),它創(chuàng)建訂單的過(guò)程中實(shí)際上執(zhí)行了2個(gè)步驟的操作:

        1、在訂單庫(kù)中插入一條訂單數(shù)據(jù),創(chuàng)建訂單;
        2、發(fā)消息給消息隊(duì)列,消息的內(nèi)容就是剛剛創(chuàng)建的訂單。

        購(gòu)物車系統(tǒng)訂閱相應(yīng)的主題,接收訂單創(chuàng)建的消息,然后清理購(gòu)物車,在購(gòu)物車中刪除訂單中的商品。

        問(wèn)題的關(guān)鍵點(diǎn)集中在訂單系統(tǒng),創(chuàng)建訂單和發(fā)送消息這兩個(gè)步驟要么都操作成功,要么都操作失敗,不允許一個(gè)成功而另一個(gè)失敗的情況出現(xiàn)。

        什么是分布式事務(wù)?


        事務(wù)的4個(gè)特性(ACID):

        • 原子性:指一個(gè)事務(wù)操作不可分割,要么成功,要么失敗,不能有一半成功一半失敗的情況。


        • 一致性:指這些數(shù)據(jù)在事務(wù)執(zhí)行完成這個(gè)時(shí)間點(diǎn)之前,讀到的一定是更新前的數(shù)據(jù),之后讀到的一定是更新后的數(shù)據(jù),不應(yīng)該存在一個(gè)時(shí)刻,讓用戶讀到更新過(guò)程中的數(shù)據(jù)。


        • 隔離性:指一個(gè)事務(wù)的執(zhí)行不能被其他事務(wù)干擾。即一個(gè)事務(wù)內(nèi)部的操作及使用的數(shù)據(jù)對(duì)正在進(jìn)行的其他事務(wù)是隔離的,并發(fā)執(zhí)行的各個(gè)事務(wù)之間不能互相干擾。


        • 持久性:指一個(gè)事務(wù)一旦完成提交,后續(xù)的其他操作和故障都不會(huì)對(duì)事務(wù)的結(jié)果產(chǎn)生任何影響 事務(wù)消息適用的場(chǎng)景主要是那些需要異步更新數(shù)據(jù),并且對(duì)數(shù)據(jù)實(shí)時(shí)性要求不太高的場(chǎng)景。比如訂單系統(tǒng)的例子,在創(chuàng)建訂單后,如果出現(xiàn)短暫的幾秒,購(gòu)物車?yán)锏纳唐窙](méi)有及時(shí)情況,也不是完全不可接受的,只要最終購(gòu)物車的數(shù)據(jù)和訂單數(shù)據(jù)保持一致就可以了。


        2、消息隊(duì)列是如何實(shí)現(xiàn)分布式事務(wù)的?


        回到訂單和購(gòu)物車這個(gè)例子,來(lái)看下如何用消息隊(duì)列來(lái)實(shí)現(xiàn)分布式事務(wù):


        首先,訂單系統(tǒng)在消息隊(duì)列上開(kāi)啟了一個(gè)事務(wù)。然后訂單系統(tǒng)給消息服務(wù)器發(fā)送一個(gè)半消息,這個(gè)半消息包含的內(nèi)容是完整的消息內(nèi)容,和普通消息的唯一區(qū)別是,在事務(wù)提交之前,對(duì)于消費(fèi)者來(lái)說(shuō),這個(gè)消息是不可見(jiàn)的。

        半消息發(fā)送成功后,訂單系統(tǒng)就可以執(zhí)行本地事務(wù)了,在訂單庫(kù)中創(chuàng)建一條訂單記錄,并提交訂單庫(kù)的數(shù)據(jù)庫(kù)事務(wù)。然后根據(jù)本地事務(wù)的執(zhí)行結(jié)果決定提交或者回滾事務(wù)消息。

        如果訂單創(chuàng)建成功,那就提交事務(wù)消息,購(gòu)物車系統(tǒng)就可以消費(fèi)到這條消息繼續(xù)后續(xù)的流程。如果訂單創(chuàng)建失敗,那就回滾事務(wù)消息,購(gòu)物車系統(tǒng)就不會(huì)收到這條消息。這樣就基本實(shí)現(xiàn)了要么都成功,要么都失敗的一致性要求。

        如果在第四步提交事務(wù)消息時(shí)失敗了,Kafka會(huì)直接拋出異常,讓用戶自行處理,可以在業(yè)務(wù)代碼中反復(fù)重試提交,直到提交成功,或者刪除之前創(chuàng)建的訂單進(jìn)行補(bǔ)償。

        3、RocketMQ中的分布式事務(wù)實(shí)現(xiàn)


        在RocketMQ中的事務(wù)實(shí)現(xiàn)中,增加了事務(wù)反查的機(jī)制來(lái)解決事務(wù)消息提交失敗的問(wèn)題。

        如果Producer也就是訂單系統(tǒng),在提交或者回滾事務(wù)消息時(shí)發(fā)生網(wǎng)絡(luò)異常,RocketMQ的Broker沒(méi)有收到提交或者回滾的請(qǐng)求,Broker會(huì)定期去Producer上反查這個(gè)事務(wù)對(duì)應(yīng)的本地事務(wù)的狀態(tài),然后根據(jù)反查結(jié)果決定提交或者回滾這個(gè)事務(wù)。

        為了支撐這個(gè)事務(wù)反查機(jī)制,業(yè)務(wù)代碼中需要實(shí)現(xiàn)一個(gè)反查本地事務(wù)狀態(tài)的接口,告知RocketMQ本地事務(wù)是成功還是失敗。

        在訂單系統(tǒng)的例子中,反查本地事務(wù)的邏輯只要根據(jù)消息中的訂單ID,在訂單庫(kù)中查詢這個(gè)訂單是否存在即可,如果訂單存在則返回成功,否則返回失敗。

        RocketMQ會(huì)自動(dòng)根據(jù)事務(wù)反查的結(jié)果,提交或者回滾事務(wù)消息。

        這個(gè)反查本地事務(wù)的實(shí)現(xiàn),并不依賴消息的發(fā)送方,也就是訂單服務(wù)的某個(gè)實(shí)例節(jié)點(diǎn)上的任何數(shù)據(jù)。

        這種情況下,即使是發(fā)送事務(wù)消息的那個(gè)訂單服務(wù)節(jié)點(diǎn)宕機(jī)了,RocketMQ依然可以通過(guò)其他訂單服務(wù)的節(jié)點(diǎn)來(lái)執(zhí)行反查,確保事務(wù)的完整性。

        使用RocketMQ事務(wù)消息功能實(shí)現(xiàn)分布式事務(wù)的流程如下圖:


        者:邋遢的流浪劍客

        來(lái)源:

        https://blog.csdn.net/qq_40378034/article/details/98790433

        程序汪資料鏈接

        程序汪接的7個(gè)私活都在這里,經(jīng)驗(yàn)整理

        Java項(xiàng)目分享 最新整理全集,找項(xiàng)目不累啦 06版

        堪稱神級(jí)的Spring Boot手冊(cè),從基礎(chǔ)入門(mén)到實(shí)戰(zhàn)進(jìn)階

        臥槽!字節(jié)跳動(dòng)《算法中文手冊(cè)》火了,完整版 PDF 開(kāi)放下載!

        臥槽!阿里大佬總結(jié)的《圖解Java》火了,完整版PDF開(kāi)放下載!

        字節(jié)跳動(dòng)總結(jié)的設(shè)計(jì)模式 PDF 火了,完整版開(kāi)放下載!


        歡迎添加程序汪個(gè)人微信 itwang005? 進(jìn)粉絲群或圍觀朋友圈

        瀏覽 74
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            欧美a精品| 免费一级婬A片久久久爽死你网站 | bdsm性折磨bdsm电击 | 大香伊人 | 色婷婷六月天 | 丁香五月激情六月 | 在线主播精品国产 | 成人AV一区二区三区婷婷 | 99re热精品视频 | 丝袜老师踩我的喷水 |