1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        delaymq高性能的任意延時消息隊列

        聯(lián)合創(chuàng)作 · 2023-10-01 04:02

        RocketMQ 開源版本任意時間延時隊列實(shí)現(xiàn)

        定時消息:Producer將消息發(fā)送到消息隊列RocketMQ版服務(wù)端,但并不期望立馬投遞這條消息,而是推遲到在當(dāng)前時間點(diǎn)之后的某一個時間投遞到Consumer進(jìn)行消費(fèi),該消息即定時消息。

        延時消息:Producer將消息發(fā)送到消息隊列RocketMQ版服務(wù)端,但并不期望立馬投遞這條消息,而是延遲一定時間后才投遞到Consumer進(jìn)行消費(fèi),該消息即延時消息。

        定時消息與延時消息在代碼配置上存在一些差異,但是最終達(dá)到的效果相同:消息在發(fā)送到消息隊列RocketMQ版服務(wù)端后并不會立馬投遞,而是根據(jù)消息中的屬性延遲固定時間后才投遞給消費(fèi)者。

        適用場景

        定時消息和延時消息適用于以下一些場景:

        消息生產(chǎn)和消費(fèi)有時間窗口要求,例如在電商交易中超時未支付關(guān)閉訂單的場景,在訂單創(chuàng)建時會發(fā)送一條延時消息。

        這條消息將會在30分鐘以后投遞給消費(fèi)者,消費(fèi)者收到此消息后需要判斷對應(yīng)的訂單是否已完成支付。

        如支付未完成,則關(guān)閉訂單。如已完成支付則忽略。

        通過消息觸發(fā)一些定時任務(wù),例如在某一固定時間點(diǎn)向用戶發(fā)送提醒消息。

        使用方式 定時消息和延時消息的使用在代碼編寫上存在略微的區(qū)別:

        發(fā)送定時消息需要明確指定消息發(fā)送時間點(diǎn)之后的某一時間點(diǎn)作為消息投遞的時間點(diǎn)。

        發(fā)送延時消息時需要設(shè)定一個延時時間長度,消息將從當(dāng)前發(fā)送時間點(diǎn)開始延遲固定時間之后才開始投遞。

        注意事項

        定時消息的精度會有1s~2s的延遲誤差。

        定時和延時消息的msg.setStartDeliverTime參數(shù)需要設(shè)置成當(dāng)前時間戳之后的某個時刻(單位毫秒)。

        如果被設(shè)置成當(dāng)前時間戳之前的某個時刻,消息將立刻投遞給消費(fèi)者。

        定時和延時消息的msg.setStartDeliverTime參數(shù)可設(shè)置40天內(nèi)的任何時刻(單位毫秒),超過40天消息發(fā)送將失敗。

        StartDeliverTime是服務(wù)端開始向消費(fèi)端投遞的時間。如果消費(fèi)者當(dāng)前有消息堆積,那么定時和延時消息會排在堆積消息后面,將不能嚴(yán)格按照配置的時間進(jìn)行投遞。

        由于客戶端和服務(wù)端可能存在時間差,消息的實(shí)際投遞時間與客戶端設(shè)置的投遞時間之間可能存在偏差。

        如何使用

        推薦使用阿里云提供的rocketmq版本的pom

               <dependency>
                    <groupId>com.aliyun.openservicesgroupId>
                    <artifactId>ons-clientartifactId>
                    <version>1.8.4.Finalversion>
                dependency>
        

        消息發(fā)送

        import com.aliyun.openservices.ons.api.*;
        import com.aliyun.openservices.shade.org.apache.commons.lang3.time.DateFormatUtils;
        
        import java.util.Date;
        import java.util.Properties;
        
        public class ProducerDelayTest {
            public static void main(String[] args) {
                Properties properties = new Properties();
                // AccessKey ID阿里云身份驗證,在阿里云RAM控制臺創(chuàng)建。
                properties.put(PropertyKeyConst.AccessKey, "XXX");
                // AccessKey Secret阿里云身份驗證,在阿里云RAM控制臺創(chuàng)建。
                properties.put(PropertyKeyConst.SecretKey, "XXX");
                // 設(shè)置TCP接入域名,進(jìn)入消息隊列RocketMQ版控制臺實(shí)例詳情頁面的接入點(diǎn)區(qū)域查看。
                properties.put(PropertyKeyConst.NAMESRV_ADDR, "localhost:9876");
                
        
                Producer producer = ONSFactory.createProducer(properties);
                // 在發(fā)送消息前,必須調(diào)用start方法來啟動Producer,只需調(diào)用一次即可。
                producer.start();
        
                 {
                    Message msg = new Message(
                            // 您在消息隊列RocketMQ版控制臺創(chuàng)建的Topic。
                            "TopicTest",
                            // Message Tag,可理解為Gmail中的標(biāo)簽,對消息進(jìn)行再歸類,方便Consumer指定過濾條件在消息隊列RocketMQ版服務(wù)器過濾。
                            "TagA",
                            // Message Body可以是任何二進(jìn)制形式的數(shù)據(jù),消息隊列RocketMQ版不做任何干預(yù),需要Producer與Consumer協(xié)商好一致的序列化和反序列化方式。
                            "演示15秒鐘>>> ".getBytes());
                    // 設(shè)置代表消息的業(yè)務(wù)關(guān)鍵屬性,請盡可能全局唯一。
                    // 以方便您在無法正常收到消息情況下,可通過控制臺查詢消息并補(bǔ)發(fā)。
                    // 注意:不設(shè)置也不會影響消息正常收發(fā)。
                    msg.setKey("ORDERID_100e");
                    try {
                        // 延時消息,單位毫秒(ms),在指定延遲時間(當(dāng)前時間之后)進(jìn)行投遞,例如消息在15秒后投遞。
                        long delayTime = System.currentTimeMillis() + 15000;
                        System.out.println("發(fā)送時間>>" + DateFormatUtils.format(new Date(), "yyyy-MM-dd HH????ss"));
        
                        // 設(shè)置消息需要被投遞的時間。
                        msg.setStartDeliverTime(delayTime);
        
                        SendResult sendResult = producer.send(msg);
                        // 同步發(fā)送消息,只要不拋異常就是成功。
                        if (sendResult != null) {
                            System.out.println(DateFormatUtils.format(new Date(), "yyyy-MM-dd HH????ss") + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
                        }
                    } catch (Exception e) {
                        // 消息發(fā)送失敗,需要進(jìn)行重試處理,可重新發(fā)送這條消息或持久化這條數(shù)據(jù)進(jìn)行補(bǔ)償處理。
                        System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
                        e.printStackTrace();
                    }
                }
                // 在應(yīng)用退出前,銷毀Producer對象。
                // 注意:如果不銷毀也沒有問題。
                producer.shutdown();
            }
        }           
        
        
        瀏覽 17
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        編輯 分享
        舉報
        評論
        圖片
        表情
        推薦
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        編輯 分享
        舉報
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            亚洲精品国偷拍自产在线观看蜜桃 | 特级西西444www大精品视频免费看 | 欧美污污视频 | 公交车被弄到高潮嗯啊视频 | 男人吃女人胸视频 | 中国操逼大片 | 男模隐私网站不遮挡无内裤 | 少妇老师勾引我做爰电影 | 欧美乱战大交xxxxx | 韩国精品毛片 |