1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        延遲隊(duì)列實(shí)現(xiàn),定時(shí)任務(wù),關(guān)閉訂單

        共 6230字,需瀏覽 13分鐘

         ·

        2020-10-02 17:56

        場(chǎng)景

        開(kāi)發(fā)中經(jīng)常需要用到定時(shí)任務(wù),對(duì)于商城來(lái)說(shuō),定時(shí)任務(wù)尤其多,比如優(yōu)惠券定時(shí)過(guò)期、訂單定時(shí)關(guān)閉、微信支付2小時(shí)未支付關(guān)閉訂單等等,都需要用到定時(shí)任務(wù),但是定時(shí)任務(wù)本身有一個(gè)問(wèn)題,一般來(lái)說(shuō)我們都是通過(guò)定時(shí)輪詢(xún)查詢(xún)數(shù)據(jù)庫(kù)來(lái)判斷是否有任務(wù)需要執(zhí)行,也就是說(shuō)不管怎么樣,我們需要先查詢(xún)數(shù)據(jù)庫(kù),而且有些任務(wù)對(duì)時(shí)間準(zhǔn)確要求比較高的,需要每秒查詢(xún)一次,對(duì)于系統(tǒng)小倒是無(wú)所謂,如果系統(tǒng)本身就大而且數(shù)據(jù)也多的情況下,這就不大現(xiàn)實(shí)了,所以需要其他方式的,當(dāng)然實(shí)現(xiàn)的方式有多種多樣的,比如Redis實(shí)現(xiàn)定時(shí)隊(duì)列、基于優(yōu)先級(jí)隊(duì)列的JDK延遲隊(duì)列、時(shí)間輪等。因?yàn)槲覀冺?xiàng)目中本身就使用到了Rabbitmq,所以基于方便開(kāi)發(fā)和維護(hù)的原則,我們使用了Rabbitmq延遲隊(duì)列來(lái)實(shí)現(xiàn)定時(shí)任務(wù),不知道rabbitmq是什么的和不知道springboot怎么集成Rabbitmq的可以查看我之前的文章Spring boot集成RabbitMQ

        Rabbitmq延遲隊(duì)列

        Rabbitmq本身是沒(méi)有延遲隊(duì)列的,只能通過(guò)Rabbitmq本身隊(duì)列的特性來(lái)實(shí)現(xiàn),想要Rabbitmq實(shí)現(xiàn)延遲隊(duì)列,需要使用Rabbitmq的死信交換機(jī)(Exchange)和消息的存活時(shí)間TTL(Time To Live)

        死信交換機(jī)

        一個(gè)消息在滿(mǎn)足如下條件下,會(huì)進(jìn)死信交換機(jī),記住這里是交換機(jī)而不是隊(duì)列,一個(gè)交換機(jī)可以對(duì)應(yīng)很多隊(duì)列。

        一個(gè)消息被Consumer拒收了,并且reject方法的參數(shù)里requeue是false。也就是說(shuō)不會(huì)被再次放在隊(duì)列里,被其他消費(fèi)者使用。

        上面的消息的TTL到了,消息過(guò)期了。

        隊(duì)列的長(zhǎng)度限制滿(mǎn)了。排在前面的消息會(huì)被丟棄或者扔到死信路由上。

        死信交換機(jī)就是普通的交換機(jī),只是因?yàn)槲覀儼堰^(guò)期的消息扔進(jìn)去,所以叫死信交換機(jī),并不是說(shuō)死信交換機(jī)是某種特定的交換機(jī)

        消息TTL(消息存活時(shí)間)

        消息的TTL就是消息的存活時(shí)間。RabbitMQ可以對(duì)隊(duì)列和消息分別設(shè)置TTL。對(duì)隊(duì)列設(shè)置就是隊(duì)列沒(méi)有消費(fèi)者連著的保留時(shí)間,也可以對(duì)每一個(gè)單獨(dú)的消息做單獨(dú)的設(shè)置。超過(guò)了這個(gè)時(shí)間,我們認(rèn)為這個(gè)消息就死了,稱(chēng)之為死信。如果隊(duì)列設(shè)置了,消息也設(shè)置了,那么會(huì)取小的。所以一個(gè)消息如果被路由到不同的隊(duì)列中,這個(gè)消息死亡的時(shí)間有可能不一樣(不同的隊(duì)列設(shè)置)。這里單講單個(gè)消息的TTL,因?yàn)樗攀菍?shí)現(xiàn)延遲任務(wù)的關(guān)鍵。

        1. byte[] messageBodyBytes = "Hello, world!".getBytes(); ?

        2. AMQP.BasicProperties properties = new AMQP.BasicProperties(); ?

        3. properties.setExpiration("60000"); ?

        4. channel.basicPublish("my-exchange", "queue-key", properties, messageBodyBytes); ?

        可以通過(guò)設(shè)置消息的expiration字段或者x-message-ttl屬性來(lái)設(shè)置時(shí)間,兩者是一樣的效果。只是expiration字段是字符串參數(shù),所以要寫(xiě)個(gè)int類(lèi)型的字符串:當(dāng)上面的消息扔到隊(duì)列中后,過(guò)了60秒,如果沒(méi)有被消費(fèi),它就死了。不會(huì)被消費(fèi)者消費(fèi)到。這個(gè)消息后面的,沒(méi)有“死掉”的消息對(duì)頂上來(lái),被消費(fèi)者消費(fèi)。死信在隊(duì)列中并不會(huì)被刪除和釋放,它會(huì)被統(tǒng)計(jì)到隊(duì)列的消息數(shù)中去

        處理流程圖

        創(chuàng)建交換機(jī)(Exchanges)和隊(duì)列(Queues)

        創(chuàng)建死信交換機(jī)

        如圖所示,就是創(chuàng)建一個(gè)普通的交換機(jī),這里為了方便區(qū)分,把交換機(jī)的名字取為:delay

        創(chuàng)建自動(dòng)過(guò)期消息隊(duì)列

        這個(gè)隊(duì)列的主要作用是讓消息定時(shí)過(guò)期的,比如我們需要2小時(shí)候關(guān)閉訂單,我們就需要把消息放進(jìn)這個(gè)隊(duì)列里面,把消息過(guò)期時(shí)間設(shè)置為2小時(shí)創(chuàng)建一個(gè)一個(gè)名為delay_queue1的自動(dòng)過(guò)期的隊(duì)列,當(dāng)然圖片上面的參數(shù)并不會(huì)讓消息自動(dòng)過(guò)期,因?yàn)槲覀儾](méi)有設(shè)置x-message-ttl參數(shù),如果整個(gè)隊(duì)列的消息有消息都是相同的,可以設(shè)置,這里為了靈活,所以并沒(méi)有設(shè)置,另外兩個(gè)參數(shù)x-dead-letter-exchange代表消息過(guò)期后,消息要進(jìn)入的交換機(jī),這里配置的是delay,也就是死信交換機(jī),x-dead-letter-routing-key是配置消息過(guò)期后,進(jìn)入死信交換機(jī)的routing-key,跟發(fā)送消息的routing-key一個(gè)道理,根據(jù)這個(gè)key將消息放入不同的隊(duì)列

        創(chuàng)建消息處理隊(duì)列

        這個(gè)隊(duì)列才是真正處理消息的隊(duì)列,所有進(jìn)入這個(gè)隊(duì)列的消息都會(huì)被處理消息隊(duì)列的名字為delay_queue2

        消息隊(duì)列綁定到交換機(jī)

        進(jìn)入交換機(jī)詳情頁(yè)面,將創(chuàng)建的2個(gè)隊(duì)列(delayqueue1和delayqueue2)綁定到交換機(jī)上面自動(dòng)過(guò)期消息隊(duì)列的routing key 設(shè)置為delay

        綁定delayqueue2delayqueue2 的key要設(shè)置為創(chuàng)建自動(dòng)過(guò)期的隊(duì)列的x-dead-letter-routing-key參數(shù),這樣當(dāng)消息過(guò)期的時(shí)候就可以自動(dòng)把消息放入delay_queue2這個(gè)隊(duì)列中了

        綁定后的管理頁(yè)面如下圖:

        當(dāng)然這個(gè)綁定也可以使用代碼來(lái)實(shí)現(xiàn),只是為了直觀表現(xiàn),所以本文使用的管理平臺(tái)來(lái)操作

        發(fā)送消息

        1. String msg = "hello word"; ?

        2. MessageProperties messageProperties = newMessageProperties(); ?

        3. ? ? ? ?messageProperties.setExpiration("6000");

        4. ? ? ? ?messageProperties.setCorrelationId(UUID.randomUUID().toString().getBytes());

        5. ? ? ? ?Message message = newMessage(msg.getBytes(), messageProperties);

        6. ? ? ? ?rabbitTemplate.convertAndSend("delay", "delay",message);

        主要的代碼就是

        1. messageProperties.setExpiration("6000"); ?

        設(shè)置了讓消息6秒后過(guò)期

        注意:因?yàn)橐屜⒆詣?dòng)過(guò)期,所以一定不能設(shè)置delay_queue1的監(jiān)聽(tīng),不能讓這個(gè)隊(duì)列里面的消息被接受到,否則消息一旦被消費(fèi),就不存在過(guò)期了

        接收消息

        接收消息配置好delay_queue2的監(jiān)聽(tīng)就好了

        1. package wang.raye.rabbitmq.demo1;

        2. import org.springframework.amqp.core.AcknowledgeMode; ?

        3. import org.springframework.amqp.core.Binding; ?

        4. import org.springframework.amqp.core.BindingBuilder; ?

        5. import org.springframework.amqp.core.DirectExchange; ?

        6. import org.springframework.amqp.core.Message; ?

        7. import org.springframework.amqp.core.Queue; ?

        8. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; ?

        9. import org.springframework.amqp.rabbit.connection.ConnectionFactory; ?

        10. import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; ?

        11. import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; ?

        12. import org.springframework.beans.factory.annotation.Autowired; ?

        13. import org.springframework.context.annotation.Bean; ?

        14. import org.springframework.context.annotation.Configuration;

        15. @Configuration

        16. publicclassDelayQueue{ ?

        17. ? ?/** 消息交換機(jī)的名字*/

        18. ? ?publicstaticfinalString EXCHANGE = "delay";

        19. ? ?/** 隊(duì)列key1*/

        20. ? ?publicstaticfinalString ROUTINGKEY1 = "delay";

        21. ? ?/** 隊(duì)列key2*/

        22. ? ?publicstaticfinalString ROUTINGKEY2 = "delay_key";

        23. ? ?/**

        24. ? ? * 配置鏈接信息

        25. ? ? * @return

        26. ? ? */

        27. ? ?@Bean

        28. ? ?publicConnectionFactory connectionFactory() {

        29. ? ? ? ?CachingConnectionFactory connectionFactory = newCachingConnectionFactory("120.76.237.8",5672);

        30. ? ? ? ?connectionFactory.setUsername("kberp");

        31. ? ? ? ?connectionFactory.setPassword("kberp");

        32. ? ? ? ?connectionFactory.setVirtualHost("/");

        33. ? ? ? ?connectionFactory.setPublisherConfirms(true); // 必須要設(shè)置

        34. ? ? ? ?return connectionFactory;

        35. ? ?}

        36. ? ?/** ?

        37. ? ? * 配置消息交換機(jī)

        38. ? ? * 針對(duì)消費(fèi)者配置 ?

        39. ? ? ? ?FanoutExchange: 將消息分發(fā)到所有的綁定隊(duì)列,無(wú)routingkey的概念 ?

        40. ? ? ? HeadersExchange :通過(guò)添加屬性key-value匹配 ?

        41. ? ? ? ?DirectExchange:按照routingkey分發(fā)到指定隊(duì)列 ?

        42. ? ? ? ?TopicExchange:多關(guān)鍵字匹配 ?

        43. ? ? */ ?

        44. ? ?@Bean ?

        45. ? ?publicDirectExchange defaultExchange() { ?

        46. ? ? ? ?returnnewDirectExchange(EXCHANGE, true, false);

        47. ? ?}

        48. ? ?/**

        49. ? ? * 配置消息隊(duì)列2

        50. ? ? * 針對(duì)消費(fèi)者配置 ?

        51. ? ? * @return

        52. ? ? */

        53. ? ?@Bean

        54. ? ?publicQueue queue() { ?

        55. ? ? ? returnnewQueue("delay_queue2", true); //隊(duì)列持久 ?

        56. ? ?}

        57. ? ?/**

        58. ? ? * 將消息隊(duì)列2與交換機(jī)綁定

        59. ? ? * 針對(duì)消費(fèi)者配置 ?

        60. ? ? * @return

        61. ? ? */

        62. ? ?@Bean ?

        63. ? ?@Autowired

        64. ? ?publicBinding binding() { ?

        65. ? ? ? ?returnBindingBuilder.bind(queue()).to(defaultExchange()).with(DelayQueue.ROUTINGKEY2); ?

        66. ? ?}

        67. ? ?/**

        68. ? ? * 接受消息的監(jiān)聽(tīng),這個(gè)監(jiān)聽(tīng)會(huì)接受消息隊(duì)列1的消息

        69. ? ? * 針對(duì)消費(fèi)者配置 ?

        70. ? ? * @return

        71. ? ? */

        72. ? ?@Bean ?

        73. ? ?@Autowired

        74. ? ?publicSimpleMessageListenerContainer messageContainer2(ConnectionFactory connectionFactory) { ?

        75. ? ? ? ?SimpleMessageListenerContainer container = newSimpleMessageListenerContainer(connectionFactory()); ?

        76. ? ? ? ?container.setQueues(queue()); ?

        77. ? ? ? ?container.setExposeListenerChannel(true); ?

        78. ? ? ? ?container.setMaxConcurrentConsumers(1); ?

        79. ? ? ? ?container.setConcurrentConsumers(1); ?

        80. ? ? ? ?container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設(shè)置確認(rèn)模式手工確認(rèn) ?

        81. ? ? ? ?container.setMessageListener(newChannelAwareMessageListener() {

        82. ? ? ? ? ? ?publicvoid onMessage(Message message, com.rabbitmq.client.Channel channel) throwsException{

        83. ? ? ? ? ? ? ? ?byte[] body = message.getBody(); ?

        84. ? ? ? ? ? ? ? ?System.out.println("delay_queue2 收到消息 : "+ newString(body)); ?

        85. ? ? ? ? ? ? ? ?channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //確認(rèn)消息成功消費(fèi) ?

        86. ? ? ? ? ? ?} ?

        87. ? ? ? ?}); ?

        88. ? ? ? ?return container; ?

        89. ? ?} ?

        90. }

        在消息監(jiān)聽(tīng)中處理需要定時(shí)處理的任務(wù)就好了,因?yàn)镽abbitmq能發(fā)送消息,所以可以把任務(wù)特征碼發(fā)過(guò)來(lái),比如關(guān)閉訂單就把訂單id發(fā)過(guò)來(lái),這樣就避免了需要查詢(xún)一下那些訂單需要關(guān)閉而加重MySQL負(fù)擔(dān)了,畢竟一旦訂單量大的話,查詢(xún)本身也是一件很費(fèi)IO的事情

        總結(jié)

        基于Rabbitmq實(shí)現(xiàn)定時(shí)任務(wù),就是將消息設(shè)置一個(gè)過(guò)期時(shí)間,放入一個(gè)沒(méi)有讀取的隊(duì)列中,讓消息過(guò)期后自動(dòng)轉(zhuǎn)入另外一個(gè)隊(duì)列中,監(jiān)控這個(gè)隊(duì)列消息的監(jiān)聽(tīng)處來(lái)處理定時(shí)任務(wù)具體的操作

        瀏覽 56
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            成年人黄色视频在线观看 | 少妇把腿扒开让我爽爽小说 | 日韩一级看片 | 91日B| 欧美aaaaa | 欧美三级电影在线播放 | 攻被扒开双腿强行侵犯小说 | 成人无码精品视频 | 真空取快递被邻居发现 | 淫荡五月 |