1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        分布式事務解決方案之RabbitMQ

        共 9233字,需瀏覽 19分鐘

         ·

        2022-02-14 21:46

        點擊上方 Java學習之道,選擇 設為星標

        每天18:30點,干貨準時奉上!

        作者: _沸羊羊_
        來源: juejin.cn/post/7007040162676883486

        Part1前言

        之前介紹了分布式事務解決方案:2PC,3PC,TCC機制,今天主要介紹基于 RabbitMQ 的分布式事務解決方案,Let's go !

        Part2分布式事務問題

        以電商業(yè)務為例,訂單服務對應的訂單數(shù)據(jù)庫,庫存服務對應的庫存數(shù)據(jù)庫,多個數(shù)據(jù)源之間存在了分布式事務問題。如何保證在訂單生成后,正確的扣除庫存,或者在訂單生成失敗時,還原扣除的庫存,這就是分布式事務將要解決的問題。

        解決分布式事務前的業(yè)務邏輯:

        @Transaction(rollbackFor=Exception.class)
        public void createOrder(OrderInfo orderInfothrows Exception 
        {
            // 訂單系統(tǒng) —— 生成訂單
            orderService.saveOrder(orderInfo);
            
            // 庫存系統(tǒng) —— 扣除庫存
            if(!"success".equals(reduceStock(orderInfo))){
                throw new Exception("訂單創(chuàng)建失敗,原因:庫存扣除失敗!");
            }
        }

        上述存在的問題:

        • 庫存服務的接口調用成功,訂單數(shù)據(jù)庫事務提交失敗,庫存沒有回滾,導致訂單生成失敗但扣除了庫存
        • 庫存服務的接口調用超時,訂單系統(tǒng)數(shù)據(jù)庫事務被回滾,庫存系統(tǒng)接口繼續(xù)執(zhí)行,導致訂單生成失敗但扣除了庫存

        Part3解決方案

        生產(chǎn)者:訂單系統(tǒng)將訂單信息的消息發(fā)送到 RabbitMQ 中,消息到達交換機后通過 confirm 機制將接收成功的消息返回給訂單系統(tǒng)(生產(chǎn)者),生產(chǎn)者收到正確的 confirm 后給用戶返回訂單創(chuàng)建成功的響應。

        隊列:為了防止 RabbitMQ 在收到消息后還未發(fā)送給消費者處理就掛了的情況,將隊列和消息分別持久化。

        消費者:消費者接收到消息后,為了確保消息能夠被成功正確的處理,需要考慮消息丟失問題和消息冪等性等問題。

        可靠的發(fā)送消息

        在訂單系統(tǒng)創(chuàng)建訂單的事務中添加把消息保存到訂單數(shù)據(jù)庫的業(yè)務邏輯,將消息在生產(chǎn)者中持久化。

        public class OrderDBService {
            public void createOrder(OrderInfo orderInfo) throws Exception {
                // 創(chuàng)建訂單
                String sql = "...";
                int count = jdbcTemplate.insert(sql);
                if(count != 1) {
                    throw new Exception("訂單創(chuàng)建失敗,原因:數(shù)據(jù)庫操作失敗!");
                }
                
                // 將消息保存到 MQ
                saveMQMessage(orderInfo);
            }
            
            /** 
             * 將消息持久化,此時的消息狀態(tài)字段為:未發(fā)送
             */

            public void saveMQMessage(OrderInfo orderInfo) throws Exception {
                String sql = "...";
                int count = jdbcTemplate.insert(sql);
                if(count != 1) {
                    throw new Exception("消息記錄失敗,原因:數(shù)據(jù)庫操作失??!");
                }
            }
        }

        創(chuàng)建訂單成功后,將訂單消息發(fā)送到 MQ 中,開啟發(fā)布確定機制;生產(chǎn)者收到確認后在回調方法中更新本地消息表的消息狀態(tài)為:已發(fā)送。

        spring:
          rabbitmq:
            ......
            # 開啟 confirm 模式
            publisher-confirm-type: correlated
        @Component
        @Slf4j
        public class MQService {

            @Autowired
            private RabbitTemplate rabbitTemplate;

            public void send(OrderInfo orderInfo) {
                // 發(fā)送消息到 MQ
                rabbitTemplate.convertAndSend("directEx""", orderInfo);
                // 回調
                rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
                    if (ack) {
                        log.info("消息發(fā)送成功!");
                    } else {
                        log.info("消息發(fā)送失?。?);
                    }
                });
            }
        }

        消息的持久化

        將隊列聲明為持久化

        @Bean
        public Queue getRoutingQueue() {
            // 參數(shù):queueName,durable,exclusive,autoDelete
            return new Queue("routingQueue"truefalsetrue);
        }

        將消息聲明為持久化

        public void send(String msg) {
            // 持久化消息
            Message message = MessageBuilder.withBody(msg.getBytes()).build();
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            // 發(fā)送消息到 MQ
            rabbitTemplate.convertAndSend("directEx""", message);
            // 回調
            rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
                if (ack) {
                    log.info("消息發(fā)送成功!");
                } else {
                    log.info("消息發(fā)送失??!");
                }
            });
        }

        可靠的消費消息

        保證消費者可靠消費有兩個方面,消息丟失和消息冪等性問題。

        使用手動ACK機制解決消息丟失問題。

        spring:
          rabbitmq:
            listener:
                simple:
                    # 開啟手動ACK
                    acknowledge-mode: manual
        @Component
        @Slf4j
        @RabbitListener(queues = "routingQueue")
        public class Consumer {

            @RabbitHandler
            public void process(String msg, Message message, Channel channel) throws IOException {
                try {
                    log.info("consumer received msg : " + msg);
                    // 手動ACK
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                } catch (Exception e) {
                    log.info("消息入隊失敗");
                    // 重新入隊
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(), falsetrue);
                }
            }
        }

        配置 RabbitMQ 的 retry 機制,當消費者出現(xiàn)發(fā)送 NACK或異常時,會根據(jù)配置的 retry 機制進行重試。

        解決消息的冪等性問題可以給消息添加 messageID,并且消費者消費成功后放入將 messageID 放入 redis 中。

        // 生產(chǎn)者發(fā)送消息前設置 messageID
        message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
        @Component
        @Slf4j
        @RabbitListener(queues = "routingQueue")
        public class Consumer {

            @Autowired
            private RedisTemplate redisTemplate;

            @RabbitHandler
            public void process(String msg, Message message, Channel channel) throws IOException {
                // redis 的 setnx 命令,如果 messageID 已存在代表此消息已被消費過
                if (!redisTemplate.opsForValue().setIfAbsent(message.getMessageProperties().getMessageId(), msg)) {
                    return;
                }
                try {
                    log.info("consumer received msg : " + msg);
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                } catch (Exception e) {
                    System.out.println("消息入隊失敗");
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(), falsetrue);
                }
            }
        }

        綜上,使用手動 ACK 和 retry 機制解決了消息丟失的問題,使用 messageID 存到 redis 解決了消息冪等性問題。

        Part4總結

        -- END --

         | 更多精彩文章 -



        加我微信,交個朋友
        長按/掃碼添加↑↑↑

        瀏覽 46
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            被舔的好爽 | 大鸡巴操老逼 | 国产hxc132乱人免费视频 | 三级网站网址 | 国产无码做爱视频 | 欧美黄色一级视频 | 操吊网| 草逼大片免费看 | 国产精品系列在线播放 | 国产无码一二三四 |