1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        RabbitMQ 延遲隊(duì)列實(shí)現(xiàn)定時(shí)任務(wù)的正確姿勢(shì),你學(xué)會(huì)了么?

        共 9172字,需瀏覽 19分鐘

         ·

        2021-06-24 03:31

        程序員的成長(zhǎng)之路
        互聯(lián)網(wǎng)/程序員/技術(shù)/資料共享 
        關(guān)注


        閱讀本文大概需要 5.5 分鐘。

        來(lái)自:blog.csdn.net/wantnrun/article/details/80401641

        場(chǎng)景

        開(kāi)發(fā)中經(jīng)常需要用到定時(shí)任務(wù),對(duì)于商城來(lái)說(shuō),定時(shí)任務(wù)尤其多,比如優(yōu)惠券定時(shí)過(guò)期、訂單定時(shí)關(guān)閉、微信支付2小時(shí)未支付關(guān)閉訂單等等,都需要用到定時(shí)任務(wù),但是定時(shí)任務(wù)本身有一個(gè)問(wèn)題,一般來(lái)說(shuō)我們都是通過(guò)定時(shí)輪詢查詢數(shù)據(jù)庫(kù)來(lái)判斷是否有任務(wù)需要執(zhí)行。
        也就是說(shuō)不管怎么樣,我們需要先查詢數(shù)據(jù)庫(kù),而且有些任務(wù)對(duì)時(shí)間準(zhǔn)確要求比較高的,需要每秒查詢一次,對(duì)于系統(tǒng)小倒是無(wú)所謂,如果系統(tǒng)本身就大而且數(shù)據(jù)也多的情況下,這就不大現(xiàn)實(shí)了。所以需要其他方式的,當(dāng)然實(shí)現(xiàn)的方式有多種多樣的,比如Redis實(shí)現(xiàn)定時(shí)隊(duì)列、基于優(yōu)先級(jí)隊(duì)列的JDK延遲隊(duì)列、時(shí)間輪等。
        因?yàn)槲覀冺?xiàng)目中本身就使用到了Rabbitmq,所以基于方便開(kāi)發(fā)和維護(hù)的原則,我們使用了Rabbitmq延遲隊(duì)列來(lái)實(shí)現(xiàn)定時(shí)任務(wù)。

        Rabbitmq延遲隊(duì)列

        Rabbitmq本身是沒(méi)有延遲隊(duì)列的,只能通過(guò)Rabbitmq本身隊(duì)列的特性來(lái)實(shí)現(xiàn),想要Rabbitmq實(shí)現(xiàn)延遲隊(duì)列,需要使用Rabbitmq的死信交換機(jī)(Exchange)和消息的存活時(shí)間TTL(Time To Live)

        死信交換機(jī)

        一個(gè)消息在滿足如下條件下,會(huì)進(jìn)死信交換機(jī),記住這里是交換機(jī)而不是隊(duì)列,一個(gè)交換機(jī)可以對(duì)應(yīng)很多隊(duì)列。
        1. 一個(gè)消息被Consumer拒收了,并且reject方法的參數(shù)里requeue是false。也就是說(shuō)不會(huì)被再次放在隊(duì)列里,被其他消費(fèi)者使用。
        2. 上面的消息的TTL到了,消息過(guò)期了。
        3. 隊(duì)列的長(zhǎng)度限制滿了。排在前面的消息會(huì)被丟棄或者扔到死信路由上。
        死信交換機(jī)就是普通的交換機(jī),只是因?yàn)槲覀儼堰^(guò)期的消息扔進(jìn)去,所以叫死信交換機(jī),并不是說(shuō)死信交換機(jī)是某種特定的交換機(jī)

        消息TTL(消息存活時(shí)間)

        消息的TTL就是消息的存活時(shí)間。RabbitMQ可以對(duì)隊(duì)列和消息分別設(shè)置TTL。對(duì)隊(duì)列設(shè)置就是隊(duì)列沒(méi)有消費(fèi)者連著的保留時(shí)間,也可以對(duì)每一個(gè)單獨(dú)的消息做單獨(dú)的設(shè)置。超過(guò)了這個(gè)時(shí)間,我們認(rèn)為這個(gè)消息就死了,稱之為死信。
        如果隊(duì)列設(shè)置了,消息也設(shè)置了,那么會(huì)取小的。所以一個(gè)消息如果被路由到不同的隊(duì)列中,這個(gè)消息死亡的時(shí)間有可能不一樣(不同的隊(duì)列設(shè)置)。這里單講單個(gè)消息的TTL,因?yàn)樗攀菍?shí)現(xiàn)延遲任務(wù)的關(guān)鍵。
        byte[] messageBodyBytes = "Hello, world!".getBytes();
        AMQP.BasicProperties properties = new AMQP.BasicProperties();
        properties.setExpiration("60000");
        channel.basicPublish("my-exchange""queue-key", properties, messageBodyBytes);
        可以通過(guò)設(shè)置消息的expiration字段或者x-message-ttl屬性來(lái)設(shè)置時(shí)間,兩者是一樣的效果。只是expiration字段是字符串參數(shù),所以要寫個(gè)int類型的字符串:
        當(dāng)上面的消息扔到隊(duì)列中后,過(guò)了60秒,如果沒(méi)有被消費(fèi),它就死了。不會(huì)被消費(fèi)者消費(fèi)到。這個(gè)消息后面的,沒(méi)有“死掉”的消息對(duì)頂上來(lái),被消費(fèi)者消費(fèi)。死信在隊(duì)列中并不會(huì)被刪除和釋放,它會(huì)被統(tǒng)計(jì)到隊(duì)列的消息數(shù)中去
        處理流程圖
        圖片

        創(chuàng)建交換機(jī)(Exchanges)和隊(duì)列(Queues)

        創(chuàng)建死信交換機(jī)

        圖片
        如圖所示,就是創(chuàng)建一個(gè)普通的交換機(jī),這里為了方便區(qū)分,把交換機(jī)的名字取為:delay

        創(chuàng)建自動(dòng)過(guò)期消息隊(duì)列

        這個(gè)隊(duì)列的主要作用是讓消息定時(shí)過(guò)期的,比如我們需要2小時(shí)候關(guān)閉訂單,我們就需要把消息放進(jìn)這個(gè)隊(duì)列里面,把消息過(guò)期時(shí)間設(shè)置為2小時(shí)
        圖片
        創(chuàng)建一個(gè)一個(gè)名為delay_queue1的自動(dòng)過(guò)期的隊(duì)列,當(dāng)然圖片上面的參數(shù)并不會(huì)讓消息自動(dòng)過(guò)期,因?yàn)槲覀儾](méi)有設(shè)置x-message-ttl參數(shù),如果整個(gè)隊(duì)列的消息有消息都是相同的,可以設(shè)置,這里為了靈活,所以并沒(méi)有設(shè)置。
        另外兩個(gè)參數(shù)x-dead-letter-exchange代表消息過(guò)期后,消息要進(jìn)入的交換機(jī),這里配置的是delay,也就是死信交換機(jī),x-dead-letter-routing-key是配置消息過(guò)期后,進(jìn)入死信交換機(jī)的routing-key,跟發(fā)送消息的routing-key一個(gè)道理,根據(jù)這個(gè)key將消息放入不同的隊(duì)列

        創(chuàng)建消息處理隊(duì)列

        這個(gè)隊(duì)列才是真正處理消息的隊(duì)列,所有進(jìn)入這個(gè)隊(duì)列的消息都會(huì)被處理
        圖片
        消息隊(duì)列的名字為delay_queue2

        消息隊(duì)列綁定到交換機(jī)

        進(jìn)入交換機(jī)詳情頁(yè)面,將創(chuàng)建的2個(gè)隊(duì)列(delayqueue1和delayqueue2)綁定到交換機(jī)上面
        圖片
        自動(dòng)過(guò)期消息隊(duì)列的routing key 設(shè)置為delay
        綁定delayqueue2
        圖片
        delayqueue2 的key要設(shè)置為創(chuàng)建自動(dòng)過(guò)期的隊(duì)列的x-dead-letter-routing-key參數(shù),這樣當(dāng)消息過(guò)期的時(shí)候就可以自動(dòng)把消息放入delay_queue2這個(gè)隊(duì)列中了
        綁定后的管理頁(yè)面如下圖:
        圖片
        當(dāng)然這個(gè)綁定也可以使用代碼來(lái)實(shí)現(xiàn),只是為了直觀表現(xiàn),所以本文使用的管理平臺(tái)來(lái)操作
        發(fā)送消息
        String msg = "hello word";
        MessageProperties messageProperties = new MessageProperties();
          messageProperties.setExpiration("6000");
          messageProperties.setCorrelationId(UUID.randomUUID().toString().getBytes());
          Message message = new Message(msg.getBytes(), messageProperties);
          rabbitTemplate.convertAndSend("delay""delay",message);
        主要的代碼就是
        messageProperties.setExpiration("6000");
        設(shè)置了讓消息6秒后過(guò)期
        注意:因?yàn)橐屜⒆詣?dòng)過(guò)期,所以一定不能設(shè)置delay_queue1的監(jiān)聽(tīng),不能讓這個(gè)隊(duì)列里面的消息被接受到,否則消息一旦被消費(fèi),就不存在過(guò)期了
        接收消息
        接收消息配置好delay_queue2的監(jiān)聽(tīng)就好了
        package wang.raye.rabbitmq.demo1;

        import org.springframework.amqp.core.AcknowledgeMode;
        import org.springframework.amqp.core.Binding;
        import org.springframework.amqp.core.BindingBuilder;
        import org.springframework.amqp.core.DirectExchange;
        import org.springframework.amqp.core.Message;
        import org.springframework.amqp.core.Queue;
        import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
        import org.springframework.amqp.rabbit.connection.ConnectionFactory;
        import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
        import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
        import org.springframework.beans.factory.annotation.Autowired;
        import org.springframework.context.annotation.Bean;
        import org.springframework.context.annotation.Configuration;

        @Configuration
        public class DelayQueue {
         /** 消息交換機(jī)的名字*/
         public static final String EXCHANGE = "delay";
         /** 隊(duì)列key1*/
         public static final String ROUTINGKEY1 = "delay";
         /** 隊(duì)列key2*/
         public static final String ROUTINGKEY2 = "delay_key";

         /**
          * 配置鏈接信息
          * @return
          */

         @Bean
         public ConnectionFactory connectionFactory() {
          CachingConnectionFactory connectionFactory = new CachingConnectionFactory("120.76.237.8",5672);
          
          connectionFactory.setUsername("kberp");
          connectionFactory.setPassword("kberp");
          connectionFactory.setVirtualHost("/");
          connectionFactory.setPublisherConfirms(true); // 必須要設(shè)置
          return connectionFactory;
         }
         
         /**  
          * 配置消息交換機(jī)
             * 針對(duì)消費(fèi)者配置  
                FanoutExchange: 將消息分發(fā)到所有的綁定隊(duì)列,無(wú)routingkey的概念  
                HeadersExchange :通過(guò)添加屬性key-value匹配  
                DirectExchange:按照routingkey分發(fā)到指定隊(duì)列  
                TopicExchange:多關(guān)鍵字匹配  
             */
          
            @Bean  
            public DirectExchange defaultExchange() {  
             return new DirectExchange(EXCHANGE, truefalse);
            } 
           
            /**
             * 配置消息隊(duì)列2
             * 針對(duì)消費(fèi)者配置  
             * @return
             */

            @Bean
            public Queue queue() {  
               return new Queue("delay_queue2"true); //隊(duì)列持久  
          
            }
            /**
             * 將消息隊(duì)列2與交換機(jī)綁定
             * 針對(duì)消費(fèi)者配置  
             * @return
             */

            @Bean  
            @Autowired
            public Binding binding() {  
                return BindingBuilder.bind(queue()).to(defaultExchange()).with(DelayQueue.ROUTINGKEY2);  
            } 

            /**
             * 接受消息的監(jiān)聽(tīng),這個(gè)監(jiān)聽(tīng)會(huì)接受消息隊(duì)列1的消息
             * 針對(duì)消費(fèi)者配置  
             * @return
             */

            @Bean  
            @Autowired
            public SimpleMessageListenerContainer messageContainer2(ConnectionFactory connectionFactory) {  
                SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());  
                container.setQueues(queue());  
                container.setExposeListenerChannel(true);  
                container.setMaxConcurrentConsumers(1);  
                container.setConcurrentConsumers(1);  
                container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設(shè)置確認(rèn)模式手工確認(rèn)  
                container.setMessageListener(new ChannelAwareMessageListener() {

           public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {
            byte[] body = message.getBody();  
                        System.out.println("delay_queue2 收到消息 : " + new String(body));  
                        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //確認(rèn)消息成功消費(fèi)  
            
           }  
          
                });  
                return container;  
            }  
            
           
        }
        在消息監(jiān)聽(tīng)中處理需要定時(shí)處理的任務(wù)就好了,因?yàn)镽abbitmq能發(fā)送消息,所以可以把任務(wù)特征碼發(fā)過(guò)來(lái),比如關(guān)閉訂單就把訂單id發(fā)過(guò)來(lái),這樣就避免了需要查詢一下那些訂單需要關(guān)閉而加重MySQL負(fù)擔(dān)了,畢竟一旦訂單量大的話,查詢本身也是一件很費(fèi)IO的事情

        總結(jié)

        基于Rabbitmq實(shí)現(xiàn)定時(shí)任務(wù),就是將消息設(shè)置一個(gè)過(guò)期時(shí)間,放入一個(gè)沒(méi)有讀取的隊(duì)列中,讓消息過(guò)期后自動(dòng)轉(zhuǎn)入另外一個(gè)隊(duì)列中,監(jiān)控這個(gè)隊(duì)列消息的監(jiān)聽(tīng)處來(lái)處理定時(shí)任務(wù)具體的操作。
        <END>

        推薦閱讀:

        剛進(jìn)美團(tuán),就被各種Code Review,真的有必要嗎?

        別再用 kill -9 了,這才是微服務(wù)上下線的正確姿勢(shì)!

        最近面試BAT,整理一份面試資料《Java面試BATJ通關(guān)手冊(cè)》,覆蓋了Java核心技術(shù)、JVM、Java并發(fā)、SSM、微服務(wù)、數(shù)據(jù)庫(kù)、數(shù)據(jù)結(jié)構(gòu)等等。

        獲取方式:點(diǎn)個(gè)「在看」,點(diǎn)擊上方小卡片,進(jìn)入公眾號(hào)后回復(fù)「面試題」領(lǐng)取,更多內(nèi)容陸續(xù)奉上。

        朕已閱 

        瀏覽 48
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            欧美福利在线观看 | jizz日本丝袜18老师免费 | 三级电影一区 | 《艳荡性史》免费观看 | 在线看一级成人小电影 | 韩国激情3小时14分合集 | 干B的视频在线 | 他扒开我的内裤把舌头伸进去 | 美国女人做爰全过程免费 | 一区二区三区四区在线看 |