1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        消息隊列消息丟失和消息重復發(fā)送的處理策略

        共 7051字,需瀏覽 15分鐘

         ·

        2022-07-04 16:03

        來源:https://www.jianshu.com/p/533fc6fc0963

        分布式事務

        什么是分布式事務

        我們的服務器從單機發(fā)展到擁有多臺機器的分布式系統(tǒng),各個系統(tǒng)之前需要借助于網絡進行通信,原有單機中相對可靠的方法調用以及進程間通信方式已經沒有辦法使用,同時網絡環(huán)境也是不穩(wěn)定的,造成了我們多個機器之間的數(shù)據(jù)同步問題,這就是典型的分布式事務問題。

        在分布式事務中事務的參與者、支持事務的服務器、資源服務器以及事務管理器分別位于不同的分布式系統(tǒng)的不同節(jié)點之上。分布式事務就是要保證不同節(jié)點之間的數(shù)據(jù)一致性。

        常見的分布式事務解決方案

        1、2PC(二階段提交)方案 - 強一致性

        2、3PC(三階段提交)方案

        3、TCC (Try-Confirm-Cancel)事務 - 最終一致性

        4、Saga事務 - 最終一致性

        5、本地消息表 - 最終一致性

        6、MQ事務 - 最終一致性

        這里重點關注下使用消息隊列實現(xiàn)分布式的一致性,上面幾種的分布式設計方案的具體細節(jié)可參見文章最后的引用鏈接

        基于 MQ 實現(xiàn)的分布式事務

        本地消息表-最終一致性

        消息的生產方,除了維護自己的業(yè)務邏輯之外,同時需要維護一個消息表。這個消息表里面記錄的就是需要同步到別的服務的信息,當然這個消息表,每個消息都有一個狀態(tài)值,來標識這個消息有沒有被成功處理。

        發(fā)送放的業(yè)務邏輯以及消息表中數(shù)據(jù)的插入將在一個事務中完成,這樣避免了業(yè)務處理成功 + 事務消息發(fā)送失敗,或業(yè)務處理失敗 + 事務消息發(fā)送成功,這個問題。

        圖片

        舉個栗子:

        我們假定目前有兩個服務,訂單服務,購物車服務,用戶在購物車中對幾個商品進行合并下單,之后需要清空購物車中剛剛已經下單的商品信息。

        1、消息的生產方也就是訂單服務,完成了自己的邏輯(對商品進行下單操作)然后把這個消息通過 mq 發(fā)送到需要進行數(shù)據(jù)同步的其他服務中,也就是我們栗子中的購物車服務。

        2、其他服務(購物車服務)會監(jiān)聽這個隊列;

        1、如果收到這個消息,并且數(shù)據(jù)同步執(zhí)行成功了,當然這也是一個本地事務,就通過 mq 回復消息的生產方(訂單服務)消息已經處理了,然后生產方就能標識本次事務已經結束。如果是一個業(yè)務上的錯誤,就回復消息的生產方,需要進行數(shù)據(jù)回滾了。

        2、很久沒收到這個消息,這種情況是不會發(fā)生的,消息的發(fā)送方會有一個定時的任務,會定時重試發(fā)送消息表中還沒有處理的消息;

        3、消息的生產方(訂單服務)如果收到消息回執(zhí);

        1、成功的話就修改本次消息已經處理完,也就是本次分布式事務的同步已經完成;

        2、如果消息的結果是執(zhí)行失敗,同時在本地回滾本次事務,標識消息已經處理完成;

        3、如果消息丟失,也就是回執(zhí)消息沒有收到,這種情況也不太會發(fā)生,消息的發(fā)送方(訂單服務)會有一個定時的任務,定時重試發(fā)送消息表中還沒有處理的消息,下游的服務需要做冪等,可能會收到多次重復的消息,如果一個回復消息生產方中的某個回執(zhí)信息丟失了,后面持續(xù)收到生產方的 mq 消息,然后再次回復消息的生產方回執(zhí)信息,這樣總能保證發(fā)送者能成功收到回執(zhí),消息的生產方在接收回執(zhí)消息的時候也要做到冪等性。

        這里有兩個很重要的操作:

        1、服務器處理消息需要是冪等的,消息的生產方和接收方都需要做到冪等性;

        2、發(fā)送放需要添加一個定時器來遍歷重推未處理的消息,避免消息丟失,造成的事務執(zhí)行斷裂。

        該方案的優(yōu)缺點

        優(yōu)點:

        1、在設計層面上實現(xiàn)了消息數(shù)據(jù)的可靠性,不依賴消息中間件,弱化了對 mq 特性的依賴。

        2、簡單,易于實現(xiàn)。

        缺點:

        主要是需要和業(yè)務數(shù)據(jù)綁定到一起,耦合性比較高,使用相同的數(shù)據(jù)庫,會占用業(yè)務數(shù)據(jù)庫的一些資源。

        MQ事務-最終一致性

        下面分析下幾種消息隊列對事務的支持

        RocketMQ中如何處理事務

        RocketMQ 中的事務,它解決的問題是,確保執(zhí)行本地事務和發(fā)消息這兩個操作,要么都成功,要么都失敗。并且,RocketMQ 增加了一個事務反查的機制,來盡量提高事務執(zhí)行的成功率和數(shù)據(jù)一致性。

        圖片

        主要是兩個方面,正常的事務提交和事務消息補償

        正常的事務提交

        1、發(fā)送消息(half消息),這個 half 消息和普通消息的區(qū)別,在事務提交 之前,對于消費者來說,這個消息是不可見的。

        2、MQ SERVER寫入信息,并且返回響應的結果;

        3、根據(jù)MQ SERVER響應的結果,決定是否執(zhí)行本地事務,如果MQ SERVER寫入信息成功執(zhí)行本地事務,否則不執(zhí)行;

        4、根據(jù)本地事務執(zhí)行的狀態(tài),決定是否對事務進行 Commit 或者 Rollback。MQ SERVER收到 Commit,之后就會投遞該消息到下游的訂閱服務,下游的訂閱服務就能進行數(shù)據(jù)同步,如果是 Rollback 則該消息就會被丟失;

        如果MQ SERVER沒有收到 Commit 或者 Rollback 的消息,這種情況就需要進行補償流程了

        補償流程

        1、MQ SERVER如果沒有收到來自消息發(fā)送方的 Commit 或者 Rollback 消息,就會向消息發(fā)送端也就是我們的服務器發(fā)起一次查詢,查詢當前消息的狀態(tài);

        2、消息發(fā)送方收到對應的查詢請求,查詢事務的狀態(tài),然后把狀態(tài)重新推送給MQ SERVER,MQ SERVER就能之后后續(xù)的流程了。

        相比于本地消息表來處理分布式事務,MQ 事務是把原本應該在本地消息表中處理的邏輯放到了 MQ 中來完成。

        Kafka中如何處理事務

        Kafka 中的事務解決問題,確保在一個事務中發(fā)送的多條信息,要么都成功,要么都失敗。也就是保證對多個分區(qū)寫入操作的原子性。

        通過配合 Kafka 的冪等機制來實現(xiàn) Kafka 的 Exactly Once,滿足了讀取-處理-寫入這種模式的應用程序。當然 Kafka 中的事務主要也是來處理這種模式的。

        什么是讀取-處理-寫入模式呢?

        栗如:在流計算中,用 Kafka 作為數(shù)據(jù)源,并且將計算結果保存到 Kafka 這種場景下,數(shù)據(jù)從 Kafka 的某個主題中消費,在計算集群中計算,再把計算結果保存在 Kafka 的其他主題中。這個過程中,要保證每條消息只被處理一次,這樣才能保證最終結果的成功。Kafka 事務的原子性就保證了,讀取和寫入的原子性,兩者要不一起成功,要不就一起失敗回滾。

        這里來分析下 Kafka 的事務是如何實現(xiàn)的

        它的實現(xiàn)原理和 RocketMQ 的事務是差不多的,都是基于兩階段提交來實現(xiàn)的,在實現(xiàn)上可能更麻煩

        先來介紹下事務協(xié)調者,為了解決分布式事務問題,Kafka 引入了事務協(xié)調者這個角色,負責在服務端協(xié)調整個事務。這個協(xié)調者并不是一個獨立的進程,而是 Broker 進程的一部分,協(xié)調者和分區(qū)一樣通過選舉來保證自身的可用性。

        Kafka 集群中也有一個特殊的用于記錄事務日志的主題,里面記錄的都是事務的日志。同時會有多個協(xié)調者的存在,每個協(xié)調者負責管理和使用事務日志中的幾個分區(qū)。這樣能夠并行的執(zhí)行事務,提高性能。

        下面看下具體的流程

        • 1、首先在開啟事務的時候,生產者會給協(xié)調者發(fā)送一個開啟事務的請求,協(xié)調者在事務日志中記錄下事務ID;
        • 2、然后生產者開始發(fā)送事務消息給協(xié)調者,不過需要先發(fā)送消息告知協(xié)調者在哪個主題和分區(qū),之后就正常的發(fā)送事務消息,這些事務消息不像 RocketMQ 會保存在特殊的隊列中,Kafka 未提交的事務消息和普通的消息一樣,只是在消費的時候依賴客戶端進行過濾。
        • 3、消息發(fā)送完成,生產者根據(jù)自己的執(zhí)行的狀態(tài)對協(xié)調者進行事務的提交或者回滾;

        事務的提交

        1、協(xié)調者設置事務的狀態(tài)為PrepareCommit,寫入到事務日志中;

        2、協(xié)調者在每個分區(qū)中寫入事務結束的標識,然后客戶端就能把之前過濾的未提交的事務消息放行給消費端進行消費了;

        事務的回滾

        1、協(xié)調者設置事務的狀態(tài)為PrepareAbort,寫入到事務日志中;

        2、協(xié)調者在每個分區(qū)中寫入事務回滾的標識,然后之前未提交的事務消息就能被丟棄了;

        這里引用一下【消息隊列高手課中的圖片】

        圖片

        RabbitMQ中的事務

        RabbitMQ 中事務解決的問題是確保生產者的消息到達MQ SERVER,這和其他 MQ 事務還是有點差別的,這里也不展開討論了。

        消息防丟失

        先來分析下一條消息在 MQ 中流轉所經歷的階段。

        圖片

        生產階段:生產者產生消息,通過網絡發(fā)送到 Broker 端。

        存儲階段:Broker 拿到消息,需要進行落盤,如果是集群版的 MQ 還需要同步數(shù)據(jù)到其他節(jié)點。

        消費階段:消費者在 Broker 端拉數(shù)據(jù),通過網絡傳輸?shù)竭_消費者端。

        生產階段防止消息丟失

        發(fā)生網絡丟包、網絡故障等這些會導致消息的丟失

        RabbitMQ 中的防丟失措施
        • 1、對于可以感知的錯誤,我們捕獲錯誤,然后重新投遞;
        • 2、通過 RabbitMQ 中的事務解決,RabbitMQ 中的事務解決的就是生產階段消息丟失的問題;

        在生產者發(fā)送消息之前,通過channel.txSelect開啟一個事務,接著發(fā)送消息, 如果消息投遞 server 失敗,進行事務回滾channel.txRollback,然后重新發(fā)送, 如果 server 收到消息,就提交事務channel.txCommit

        不過使用事務性能不好,這是同步操作,一條消息發(fā)送之后會使發(fā)送端阻塞,以等待RabbitMQ Server的回應,之后才能繼續(xù)發(fā)送下一條消息,生產者生產消息的吞吐量和性能都會大大降低。

        • 3、使用發(fā)送確認機制。

        使用確認機制,生產者將信道設置成 confirm 確認模式,一旦信道進入 confirm 模式,所有在該信道上面發(fā)布的消息都會被指派一個唯一的ID(從1開始),一旦消息被投遞到所有匹配的隊列之后,RabbitMQ 就會發(fā)送一個確認(Basic.Ack)給生產者(包含消息的唯一 deliveryTag 和 multiple 參數(shù)),這就使得生產者知曉消息已經正確到達了目的地了。

        multiple 為 true 表示的是批量的消息確認,為 true 的時候,表示小于等于返回的 deliveryTag 的消息 id 都已經確認了,為 false 表示的是消息 id 為返回的 deliveryTag 的消息,已經確認了。

        圖片

        確認機制有三種類型

        1、同步確認

        2、批量確認

        3、異步確認

        同步模式的效率很低,因為每一條消息度都需要等待確認好之后,才能處理下一條;

        批量確認模式相比同步模式效率是很高,不過有個致命的缺陷,一旦回復確認失敗,當前確認批次的消息會全部重新發(fā)送,導致消息重復發(fā)送;

        異步模式就是個很好的選擇了,不會有同步模式的阻塞問題,同時效率也很高,是個不錯的選擇。

        Kafka 中的防丟失措施

        Kafaka 中引入了一個 broker。broker 會對生產者和消費者進行消息的確認,生產者發(fā)送消息到 broker,如果沒有收到 broker 的確認就可以選擇繼續(xù)發(fā)送。

        只要 Producer 收到了 Broker 的確認響應,就可以保證消息在生產階段不會丟失。有些消息隊列在長時間沒收到發(fā)送確認響應后,會自動重試,如果重試再失敗,就會以返回值或者異常的方式告知用戶。

        只要正確處理 Broker 的確認響應,就可以避免消息的丟失。

        RocketMQ 中的防丟失措施
        • 使用 SYNC 的發(fā)送消息方式,等待 broker 處理結果

        RocketMQ 提供了3種發(fā)送消息方式,分別是:

        同步發(fā)送:Producer 向 broker 發(fā)送消息,阻塞當前線程等待 broker 響應 發(fā)送結果。

        異步發(fā)送:Producer 首先構建一個向 broker 發(fā)送消息的任務,把該任務提交給線程池,等執(zhí)行完該任務時,回調用戶自定義的回調函數(shù),執(zhí)行處理結果。

        Oneway發(fā)送:Oneway 方式只負責發(fā)送請求,不等待應答,Producer 只負責把請求發(fā)出去,而不處理響應結果。

        • 使用事務,RocketMQ 中的事務,它解決的問題是,確保執(zhí)行本地事務和發(fā)消息這兩個操作,要么都成功,要么都失敗。

        存儲階段

        在存儲階段正常情況下,只要 Broker 在正常運行,就不會出現(xiàn)丟失消息的問題,但是如果 Broker 出現(xiàn)了故障,比如進程死掉了或者服務器宕機了,還是可能會丟失消息的。

        RabbitMQ 中的防丟失措施

        防止在存儲階段消息額丟失,可以做持久化,防止異常情況(重啟,關閉,宕機)。。。

        RabbitMQ 持久化中有三部分:

        • 交換器的持久化

        交換器的持久化,是通過在聲明隊列時將 durable 參數(shù)置為 true 實現(xiàn)的,不設置持久化的話,交換器的信息將會丟失。

        • 隊列持久化

        隊列的持久化,是通過在聲明隊列時將 durable 參數(shù)置為 true 實現(xiàn)的,隊列的持久化能保證其本身的元數(shù)據(jù)不會因異常情況而丟失,但是并不能保證內部所存儲的消息不會丟失。

        • 消息的持久化

        消息的持久化,在投遞時指定 delivery_mode=2(1是非持久化),消息的持久化,需要配合隊列的持久,只設置消息的持久化,重啟之后隊列消失,繼而消息也會丟失。所以如果只設置消息持久化而不設置隊列的持久化意義不大。

        對于持久化,如果所有的消息都設置持久化,會影響寫入的性能,所以可以選擇對可靠性要求比較高的消息進行持久化處理。

        不過消息持久化并不能百分之百避免消息的丟失

        比如數(shù)據(jù)在落盤的過程中宕機了,消息還沒及時同步到內存中,這也是會丟數(shù)據(jù)的,這種問題可以通過引入鏡像隊列來解決。

        鏡像隊列的作用:引入鏡像隊列,可已將隊列鏡像到集群中的其他 Broker 節(jié)點之上,如果集群中的一個節(jié)點失效了,隊列能夠自動切換到鏡像中的另一個節(jié)點上來保證服務的可用性。(更細節(jié)的這里不展開討論了)

        Kafka 中的防丟失措施

        操作系統(tǒng)本身有一層緩存,叫做 Page Cache,當往磁盤文件寫入的時候,系統(tǒng)會先將數(shù)據(jù)流寫入緩存中。

        Kafka 收到消息后也會先存儲在也緩存中(Page Cache)中,之后由操作系統(tǒng)根據(jù)自己的策略進行刷盤或者通過 fsync 命令強制刷盤。如果系統(tǒng)掛掉,在 PageCache 中的數(shù)據(jù)就會丟失。也就是對應的 Broker 中的數(shù)據(jù)就會丟失了。

        圖片

        處理思路

        1、控制競選分區(qū) leader 的 Broker。如果一個 Broker 落后原先的 Leader 太多,那么它一旦成為新的 Leader,必然會造成消息的丟失。

        2、控制消息能夠被寫入到多個副本中才能提交,這樣避免上面的問題1。

        RocketMQ 中的防丟失措施

        1、將刷盤方式改成同步刷盤;

        2、對于多個節(jié)點的 Broker,需要將 Broker 集群配置成:至少將消息發(fā)送到 2 個以上的節(jié)點,再給客戶端回復發(fā)送確認響應。這樣當某個 Broker 宕機時,其他的 Broker 可以替代宕機的 Broker,也不會發(fā)生消息丟失。

        消費階段

        消費階段就很簡單了,如果在網絡傳輸中丟失,這個消息之后還會持續(xù)的推送給消費者,在消費階段我們只需要控制在業(yè)務邏輯處理完成之后再去進行消費確認就行了。

        總結:對于消息的丟失,也可以借助于本地消息表的思路,消息產生的時候進行消息的落盤,長時間未處理的消息,使用定時重推到隊列中。

        消息重復發(fā)送

        消息在 MQ 中的傳遞,大致可以歸類為下面三種:

        1、At most once: 至多一次。消息在傳遞時,最多會被送達一次。是不安全的,可能會丟數(shù)據(jù)。

        2、At least once: 至少一次。消息在傳遞時,至少會被送達一次。也就是說,不允許丟消息,但是允許有少量重復消息出現(xiàn)。

        3、Exactly once:恰好一次。消息在傳遞時,只會被送達一次,不允許丟失也不允許重復,這個是最高的等級。

        大部分消息隊列滿足的都是At least once,也就是可以允許重復的消息出現(xiàn)。

        我們消費者需要滿足冪等性,通常有下面幾種處理方案

        1、利用數(shù)據(jù)庫的唯一性

        根據(jù)業(yè)務情況,選定業(yè)務中能夠判定唯一的值作為數(shù)據(jù)庫的唯一鍵,新建一個流水表,然后執(zhí)行業(yè)務操作和流水表數(shù)據(jù)的插入放在同一事務中,如果流水表數(shù)據(jù)已經存在,那么就執(zhí)行失敗,借此保證冪等性。也可先查詢流水表的數(shù)據(jù),沒有數(shù)據(jù)然后執(zhí)行業(yè)務,插入流水表數(shù)據(jù)。不過需要注意,數(shù)據(jù)庫讀寫延遲的情況。

        2、數(shù)據(jù)庫的更新增加前置條件

        3、給消息帶上唯一ID

        每條消息加上唯一ID,利用方法1中通過增加流水表,借助數(shù)據(jù)庫的唯一性來處理重復消息的消費。

        我們創(chuàng)建了一個高質量的技術交流群,與優(yōu)秀的人在一起,自己也會優(yōu)秀起來,趕緊點擊加群,享受一起成長的快樂。另外,如果你最近想跳槽的話,年前我花了2周時間收集了一波大廠面經,節(jié)后準備跳槽的可以點擊這里領取!

        推薦閱讀

        ··································

        你好,我是程序猿DD,10年開發(fā)老司機、阿里云MVP、騰訊云TVP、出過書創(chuàng)過業(yè)、國企4年互聯(lián)網6年。從普通開發(fā)到架構師、再到合伙人。一路過來,給我最深的感受就是一定要不斷學習并關注前沿。只要你能堅持下來,多思考、少抱怨、勤動手,就很容易實現(xiàn)彎道超車!所以,不要問我現(xiàn)在干什么是否來得及。如果你看好一個事情,一定是堅持了才能看到希望,而不是看到希望才去堅持。相信我,只要堅持下來,你一定比現(xiàn)在更好!如果你還沒什么方向,可以先關注我,這里會經常分享一些前沿資訊,幫你積累彎道超車的資本。

        點擊領取2022最新10000T學習資料
        瀏覽 48
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            欧美自拍偷拍 | 女人19毛片水真多免费视频 | 青娱乐在线观看 | 神马久久午夜 | 欧美另类久久 | 免费久草视频 | 巨肉1v1公妇h怀孕徐婉婉 | 日本少妇激情舌吻 | 久久影院三级片 | 大香蕉草逼 |