1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        Spring Boot整合RabbitMQ詳細教程

        共 8186字,需瀏覽 17分鐘

         ·

        2020-12-28 03:28

        點擊上方藍色字體,選擇“標星公眾號”

        優(yōu)質(zhì)文章,第一時間送達

        ? 作者?|??小小丑年紀

        來源 |? urlify.cn/MZvAry

        66套java從入門到精通實戰(zhàn)課程分享

        1.首先我們簡單了解一下消息中間件的應(yīng)用場景

        異步處理

        場景說明:用戶注冊后,需要發(fā)注冊郵件和注冊短信,傳統(tǒng)的做法有兩種1.串行的方式;2.并行的方式?
        (1)串行方式:將注冊信息寫入數(shù)據(jù)庫后,發(fā)送注冊郵件,再發(fā)送注冊短信,以上三個任務(wù)全部完成后才返回給客戶端。這有一個問題是,郵件,短信并不是必須的,它只是一個通知,而這種做法讓客戶端等待沒有必要等待的東西.

        (2)并行方式:將注冊信息寫入數(shù)據(jù)庫后,發(fā)送郵件的同時,發(fā)送短信,以上三個任務(wù)完成后,返回給客戶端,并行的方式能提高處理的時間。?
        ?
        假設(shè)三個業(yè)務(wù)節(jié)點分別使用50ms,串行方式使用時間150ms,并行使用時間100ms。雖然并性已經(jīng)提高的處理時間,但是,前面說過,郵件和短信對我正常的使用網(wǎng)站沒有任何影響,客戶端沒有必要等著其發(fā)送完成才顯示注冊成功,應(yīng)該是寫入數(shù)據(jù)庫后就返回.
        (3)消息隊列?
        引入消息隊列后,把發(fā)送郵件,短信不是必須的業(yè)務(wù)邏輯異步處理?
        ?
        由此可以看出,引入消息隊列后,用戶的響應(yīng)時間就等于寫入數(shù)據(jù)庫的時間+寫入消息隊列的時間(可以忽略不計),引入消息隊列后處理后,響應(yīng)時間是串行的3倍,是并行的2倍。

        ?應(yīng)用解耦

        場景:雙11是購物狂節(jié),用戶下單后,訂單系統(tǒng)需要通知庫存系統(tǒng),傳統(tǒng)的做法就是訂單系統(tǒng)調(diào)用庫存系統(tǒng)的接口.
        ?
        這種做法有一個缺點:

        • 當(dāng)庫存系統(tǒng)出現(xiàn)故障時,訂單就會失敗。

        • 訂單系統(tǒng)和庫存系統(tǒng)高耦合.?
          引入消息隊列?

        • 訂單系統(tǒng):用戶下單后,訂單系統(tǒng)完成持久化處理,將消息寫入消息隊列,返回用戶訂單下單成功。

        • 庫存系統(tǒng):訂閱下單的消息,獲取下單消息,進行庫操作。?
          就算庫存系統(tǒng)出現(xiàn)故障,消息隊列也能保證消息的可靠投遞,不會導(dǎo)致消息丟失。

        流量削峰

        流量削峰一般在秒殺活動中應(yīng)用廣泛?
        場景:秒殺活動,一般會因為流量過大,導(dǎo)致應(yīng)用掛掉,為了解決這個問題,一般在應(yīng)用前端加入消息隊列。?
        作用:?
        1.可以控制活動人數(shù),超過此一定閥值的訂單直接丟棄(我為什么秒殺一次都沒有成功過呢^^)?
        2.可以緩解短時間的高流量壓垮應(yīng)用(應(yīng)用程序按自己的最大處理能力獲取訂單)?
        ?
        1.用戶的請求,服務(wù)器收到之后,首先寫入消息隊列,加入消息隊列長度超過最大值,則直接拋棄用戶請求或跳轉(zhuǎn)到錯誤頁面.?

        2.秒殺業(yè)務(wù)根據(jù)消息隊列中的請求信息,再做后續(xù)處理.

        以上內(nèi)容的來源是:https://blog.csdn.net/whoamiyang/article/details/54954780,在此感謝

        2.各種消息中間件性能的比較:

        TPS比較 一ZeroMq 最好,RabbitMq 次之, ActiveMq 最差。

        持久化消息比較—zeroMq不支持,activeMq和rabbitMq都支持。持久化消息主要是指:MQ down或者MQ所在的服務(wù)器down了,消息不會丟失的機制。

        可靠性、靈活的路由、集群、事務(wù)、高可用的隊列、消息排序、問題追蹤、可視化管理工具、插件系統(tǒng)、社區(qū)—RabbitMq最好,ActiveMq次之,ZeroMq最差。

        高并發(fā)—從實現(xiàn)語言來看,RabbitMQ最高,原因是它的實現(xiàn)語言是天生具備高并發(fā)高可用的erlang語言。

        綜上所述:RabbitMQ的性能相對來說更好更全面,是消息中間件的首選。

        3.接下來我們在springboot當(dāng)中整合使用RabbitMQ

        第一步:導(dǎo)入maven依賴


        org.springframework.boot
        spring-boot-starter-amqp
        1.5.2.RELEASE

        第二步:在application.properties文件當(dāng)中引入RabbitMQ基本的配置信息

        #對于rabbitMQ的支持
        spring.rabbitmq.host=127.0.0.1
        spring.rabbitmq.port=5672
        spring.rabbitmq.username=guest
        spring.rabbitmq.password=guest

        第三步:編寫RabbitConfig類,類里面設(shè)置很多個EXCHANGE,QUEUE,ROUTINGKEY,是為了接下來的不同使用場景。

        /**
        Broker:它提供一種傳輸服務(wù),它的角色就是維護一條從生產(chǎn)者到消費者的路線,保證數(shù)據(jù)能按照指定的方式進行傳輸,
        Exchange:消息交換機,它指定消息按什么規(guī)則,路由到哪個隊列。
        Queue:消息的載體,每個消息都會被投到一個或多個隊列。
        Binding:綁定,它的作用就是把exchange和queue按照路由規(guī)則綁定起來.
        Routing Key:路由關(guān)鍵字,exchange根據(jù)這個關(guān)鍵字進行消息投遞。
        vhost:虛擬主機,一個broker里可以有多個vhost,用作不同用戶的權(quán)限分離。
        Producer:消息生產(chǎn)者,就是投遞消息的程序.
        Consumer:消息消費者,就是接受消息的程序.
        Channel:消息通道,在客戶端的每個連接里,可建立多個channel.
        */
        @Configuration
        public?class?RabbitConfig?{
        ?
        private?final?Logger?logger?=?LoggerFactory.getLogger(this.getClass());
        ?
        @Value("${spring.rabbitmq.host}")
        private?String?host;
        ?
        @Value("${spring.rabbitmq.port}")
        private?int?port;
        ?
        @Value("${spring.rabbitmq.username}")
        private?String?username;
        ?
        @Value("${spring.rabbitmq.password}")
        private?String?password;
        ?
        ?
        public?static?final?String?EXCHANGE_A?=?"my-mq-exchange_A";
        public?static?final?String?EXCHANGE_B?=?"my-mq-exchange_B";
        public?static?final?String?EXCHANGE_C?=?"my-mq-exchange_C";
        ?
        ?
        public?static?final?String?QUEUE_A?=?"QUEUE_A";
        public?static?final?String?QUEUE_B?=?"QUEUE_B";
        public?static?final?String?QUEUE_C?=?"QUEUE_C";
        ?
        public?static?final?String?ROUTINGKEY_A?=?"spring-boot-routingKey_A";
        public?static?final?String?ROUTINGKEY_B?=?"spring-boot-routingKey_B";
        public?static?final?String?ROUTINGKEY_C?=?"spring-boot-routingKey_C";
        ?
        @Bean
        public?ConnectionFactory?connectionFactory()?{
        CachingConnectionFactory?connectionFactory?=?new?CachingConnectionFactory(host,port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPublisherConfirms(true);
        return?connectionFactory;
        }
        ?
        @Bean
        @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
        //必須是prototype類型
        public?RabbitTemplate?rabbitTemplate()?{
        RabbitTemplate?template?=?new?RabbitTemplate(connectionFactory());
        return?template;
        }
        }

        第四步:編寫消息的生產(chǎn)者

        @Component
        public?class?MsgProducer?implements?RabbitTemplate.ConfirmCallback?{
        ?
        private?final?Logger?logger?=?LoggerFactory.getLogger(this.getClass());
        ?
        //由于rabbitTemplate的scope屬性設(shè)置為ConfigurableBeanFactory.SCOPE_PROTOTYPE,所以不能自動注入
        private?RabbitTemplate?rabbitTemplate;
        /**
        *?構(gòu)造方法注入rabbitTemplate
        */
        @Autowired
        public?MsgProducer(RabbitTemplate?rabbitTemplate)?{
        this.rabbitTemplate?=?rabbitTemplate;
        rabbitTemplate.setConfirmCallback(this);?//rabbitTemplate如果為單例的話,那回調(diào)就是最后設(shè)置的內(nèi)容
        }
        ?
        public?void?sendMsg(String?content)?{
        CorrelationData?correlationId?=?new?CorrelationData(UUID.randomUUID().toString());
        //把消息放入ROUTINGKEY_A對應(yīng)的隊列當(dāng)中去,對應(yīng)的是隊列A
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A,?RabbitConfig.ROUTINGKEY_A,?content,?correlationId);
        }
        /**
        *?回調(diào)
        */
        @Override
        public?void?confirm(CorrelationData?correlationData,?boolean?ack,?String?cause)?{
        logger.info("?回調(diào)id:"?+?correlationData);
        if?(ack)?{
        logger.info("消息成功消費");
        }?else?{
        logger.info("消息消費失敗:"?+?cause);
        }
        }
        }

        第五步:把交換機,隊列,通過路由關(guān)鍵字進行綁定,寫在RabbitConfig類當(dāng)中

        /**
        *?針對消費者配置
        *?1.?設(shè)置交換機類型
        *?2.?將隊列綁定到交換機
        FanoutExchange:?將消息分發(fā)到所有的綁定隊列,無routingkey的概念
        HeadersExchange :通過添加屬性key-value匹配
        DirectExchange:按照routingkey分發(fā)到指定隊列
        TopicExchange:多關(guān)鍵字匹配
        */
        @Bean
        public?DirectExchange?defaultExchange()?{
        return?new?DirectExchange(EXCHANGE_A);
        }
        /**
        *?獲取隊列A
        *?@return
        */
        @Bean
        public?Queue?queueA()?{
        return?new?Queue(QUEUE_A,?true);?//隊列持久
        }
        @Bean
        public?Binding?binding()?{
        ?
        return?BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);
        }

        一個交換機可以綁定多個消息隊列,也就是消息通過一個交換機,可以分發(fā)到不同的隊列當(dāng)中去。

        @Bean
        public?Binding?binding()?{
        return?BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);
        }
        @Bean
        public?Binding?bindingB(){
        return?BindingBuilder.bind(queueB()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_B);
        }

        第六步:編寫消息的消費者,這一步也是最復(fù)雜的,因為可以編寫出很多不同的需求出來,寫法也有很多的不同。

        ? ? 比如一個生產(chǎn)者,一個消費者

        @Component
        @RabbitListener(queues?=?RabbitConfig.QUEUE_A)
        public?class?MsgReceiver?{
        ?
        private?final?Logger?logger?=?LoggerFactory.getLogger(this.getClass());
        ?
        @RabbitHandler
        public?void?process(String?content)?{
        logger.info("接收處理隊列A當(dāng)中的消息:?"?+?content);
        }
        ?
        }

        比如一個生產(chǎn)者,多個消費者,可以寫多個消費者,并且他們的分發(fā)是負載均衡的。

        @Component
        @RabbitListener(queues?=?RabbitConfig.QUEUE_A)
        public?class?MsgReceiverC_one?{
        private?final?Logger?logger?=?LoggerFactory.getLogger(this.getClass());
        ?
        @RabbitHandler
        public?void?process(String?content)?{
        logger.info("處理器one接收處理隊列A當(dāng)中的消息:?"?+?content);
        }
        }
        @Component
        @RabbitListener(queues?=?RabbitConfig.QUEUE_A)
        public?class?MsgReceiverC_two?{
        private?final?Logger?logger?=?LoggerFactory.getLogger(this.getClass());
        @RabbitHandler
        public?void?process(String?content)?{
        logger.info("處理器two接收處理隊列A當(dāng)中的消息:?"?+?content);
        }
        ?
        }


        另外一種消息處理機制的寫法如下,在RabbitMQConfig類里面增加bean:

        @Bean
        public?SimpleMessageListenerContainer?messageContainer()?{
        //加載處理消息A的隊列
        SimpleMessageListenerContainer?container?=?new?SimpleMessageListenerContainer(connectionFactory());
        //設(shè)置接收多個隊列里面的消息,這里設(shè)置接收隊列A
        //假如想一個消費者處理多個隊列里面的信息可以如下設(shè)置:
        //container.setQueues(queueA(),queueB(),queueC());
        container.setQueues(queueA());
        container.setExposeListenerChannel(true);
        //設(shè)置最大的并發(fā)的消費者數(shù)量
        container.setMaxConcurrentConsumers(10);
        //最小的并發(fā)消費者的數(shù)量
        container.setConcurrentConsumers(1);
        //設(shè)置確認模式手工確認
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.setMessageListener(new?ChannelAwareMessageListener()?{
        @Override
        public?void?onMessage(Message?message,?Channel?channel)?throws?Exception?{
        /**通過basic.qos方法設(shè)置prefetch_count=1,這樣RabbitMQ就會使得每個Consumer在同一個時間點最多處理一個Message,
        換句話說,在接收到該Consumer的ack前,它不會將新的Message分發(fā)給它?*/
        channel.basicQos(1);
        byte[]?body?=?message.getBody();
        logger.info("接收處理隊列A當(dāng)中的消息:"?+?new?String(body));
        /**為了保證永遠不會丟失消息,RabbitMQ支持消息應(yīng)答機制。
        當(dāng)消費者接收到消息并完成任務(wù)后會往RabbitMQ服務(wù)器發(fā)送一條確認的命令,然后RabbitMQ才會將消息刪除。*/
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),?false);
        }
        });
        return?container;
        }

        下面是當(dāng)一個消費者,處理多個隊列里面的信息打印的log

        ?

        Fanout Exchange

        Fanout 就是我們熟悉的廣播模式,給Fanout交換機發(fā)送消息,綁定了這個交換機的所有隊列都收到這個消息。

        //配置fanout_exchange
        @Bean
        FanoutExchange?fanoutExchange()?{
        return?new?FanoutExchange(RabbitConfig.FANOUT_EXCHANGE);
        }
        ?
        //把所有的隊列都綁定到這個交換機上去
        @Bean
        Binding?bindingExchangeA(Queue?queueA,FanoutExchange?fanoutExchange)?{
        return?BindingBuilder.bind(queueA).to(fanoutExchange);
        }
        @Bean
        Binding?bindingExchangeB(Queue?queueB,?FanoutExchange?fanoutExchange)?{
        return?BindingBuilder.bind(queueB).to(fanoutExchange);
        }
        @Bean
        Binding?bindingExchangeC(Queue?queueC,?FanoutExchange?fanoutExchange)?{
        return?BindingBuilder.bind(queueC).to(fanoutExchange);
        }

        消息發(fā)送,這里不設(shè)置routing_key,因為設(shè)置了也無效,發(fā)送端的routing_key寫任何字符都會被忽略。

        public?void?sendAll(String?content)?{
        rabbitTemplate.convertAndSend("fanoutExchange","",?content);
        }

        消息處理的結(jié)果如下所示:

        ?





        粉絲福利:Java從入門到入土學(xué)習(xí)路線圖

        ???

        ?長按上方微信二維碼?2 秒


        感謝點贊支持下哈?

        瀏覽 50
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            亚洲综合成人视频 | 香蕉卖淫视频 | 91麻豆精品在线 | 丁香五月婷婷激情啪啪 | 色视屏综合无码一区二区三区 | 欧美精品无码成人A片九色播放 | 一级片视频免费观看 | 做爱久久 | 操bb视频 | 欧美成人精品激情凹凸视频 |