1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        發(fā)送千萬級消息,我是這樣設計的

        共 5994字,需瀏覽 12分鐘

         ·

        2022-02-26 10:18

        我是3y,一年CRUD經驗用十年的markdown程序員???????常年被譽為優(yōu)質八股文選手

        前幾天,我講了什么是定時任務、為什么要用定時任務、什么是分布式定時任務、分布式定時任務的基礎以及austin項目接入分布式定時任務的背景。

        有幾個同學表示之前沒接觸過分布式定時任務,想讓我來austin的相關邏輯,于是就有了這篇文章。

        01、使用定時任務推送消息

        對于austin消息推送平臺而言,發(fā)送消息不單單是「技術側」調用接口進行發(fā)送的,還有很多場景是「運營側」通過設置定時進而推送。

        所以,作為使用消息推送平臺的角色可以簡單分為:「技術同學」、「運營同學」

        當運營或客服她們想發(fā)送消息給用戶,她們使用消息推送的后臺,步驟可簡單分為:

        1、確定發(fā)送時間

        2、確定發(fā)送的人群(人群內可以是1~N的用戶)

        3、確定發(fā)送文案

        對于發(fā)送時間則使用cron表達式,發(fā)送人群我們是讓運營上傳.csv文件,發(fā)送文案則配置在模板上(同樣也可以使用占位符

        這里值得講述的是,為什么是上傳.csv文件而不是excel,其最根本的原因:excel的行數(shù)是有大小限制的,而.csv的行數(shù)是沒有大小限制的。

        我們限定.csv文件的格式為如下:第一列填寫接收者Id、剩余的列如果使用了占位符,則需要填寫占位符變量名

        保存了消息模板之后,等我們點擊「啟動」按鈕,就根據模板的信息進行消息推送。

        (在線上環(huán)境上,在啟動之前肯定會有審核的環(huán)節(jié),并且一般會先點擊「測試」按鈕看文案是否正常才進行推送)

        點擊啟動按鈕了之后,會發(fā)生什么?我們看日志就懂了(我在執(zhí)行的過程中把關鍵的日志信息都打印出來了)

        //1.?消息模板ID的消息?定時任務被觸發(fā)(入口)
        2022-02-17?21:14:55.016?[Thread-61]?INFO??com.java3y.austin.cron.handler.CronTaskHandler?-?CronTaskHandler#execute?messageTemplateId:7?cron?exec!

        //2.?api-service接口接收到的參數(shù)信息以及接口返回值???
        2022-02-17?21:14:56.095?[pool-27-thread-1]?INFO??com.java3y.austin.support.utils.LogUtils?-?{"bizId":"7","bizType":"SendService#batchSend","executionTime":72,"logId":"9fd2cdd1-79fa-4c5c-932b-da08aa3b247d","msg":"{\"code\":\"send\",\"messageParamList\":[{\"extra\":null,\"receiver\":\"13719383334,13719383336,13719383338,13719383340,13719383342,13719383344\",\"variables\":{\"content\":\"xixi\",\"url\":\"hehe.com\"}},{\"extra\":null,\"receiver\":\"13719383333,13719383335,13719383337,13719383339,13719383341,13719383343,13719383345\",\"variables\":{\"content\":\"hhaha\",\"url\":\"baidu.com\"}}],\"messageTemplateId\":7}","operateDate":1645103696095,"returnStr":"{\"code\":\"0\",\"msg\":\"操作成功\"}","success":true,"tag":"operation"}

        //3.?receiver消息隊列接收到的原始值
        2022-02-17?21:14:56.252?[org.springframework.kafka.KafkaListenerEndpointContainer#6-0-C-1]?INFO??com.java3y.austin.support.utils.LogUtils?-?{"bizType":"Receiver#consumer","object":{"businessId":1000000720220217,"contentModel":{"content":"xixi","url":"hehe.com?track_code_bid=1000000720220217"},"idType":30,"messageTemplateId":7,"msgType":10,"receiver":["13719383340","13719383336","13719383338","13719383342","13719383344","13719383334"],"sendAccount":10,"sendChannel":30,"templateType":10},"timestamp":1645103696252}
        2022-02-17?21:14:56.254?[org.springframework.kafka.KafkaListenerEndpointContainer#6-0-C-1]?INFO??com.java3y.austin.support.utils.LogUtils?-?{"bizType":"Receiver#consumer","object":{"businessId":1000000720220217,"contentModel":{"content":"hhaha","url":"baidu.com?track_code_bid=1000000720220217"},"idType":30,"messageTemplateId":7,"msgType":10,"receiver":["13719383341","13719383339","13719383335","13719383337","13719383343","13719383333","13719383345"],"sendAccount":10,"sendChannel":30,"templateType":10},"timestamp":1645103696253}

        //4.1?關鍵位置打印的日志(state=10)代表消息隊列接收成功
        2022-02-17?21:14:56.172?[org.springframework.kafka.KafkaListenerEndpointContainer#6-0-C-1]?INFO??com.java3y.austin.support.utils.LogUtils?-?{"businessId":1000000720220217,"ids":["13719383340","13719383336","13719383338","13719383342","13719383344","13719383334"],"state":10,"timestamp":1645103696172}
        2022-02-17?21:14:56.253?[org.springframework.kafka.KafkaListenerEndpointContainer#6-0-C-1]?INFO??com.java3y.austin.support.utils.LogUtils?-?{"businessId":1000000720220217,"ids":["13719383341","13719383339","13719383335","13719383337","13719383343","13719383333","13719383345"],"state":10,"timestamp":1645103696253}

        //4.2?關鍵位置打印的日志(state=60)代表消息調用接口發(fā)送失敗
        2022-02-17?21:14:56.799?[pool-8-thread-3]?INFO??com.java3y.austin.support.utils.LogUtils?-?{"businessId":1000000720220217,"ids":["13719383340","13719383336","13719383338","13719383342","13719383344","13719383334"],"state":60,"timestamp":1645103696799}
        ??

        因為這次發(fā)送的渠道是短信,我們是有將短信的發(fā)送記錄入庫的,所以可以看看數(shù)據庫的記錄(剛好是13條,數(shù)據是沒問題的。至于發(fā)失敗,主要是該短信模板的參數(shù)不符合)

        我們整體的流程是沒有問題的

        02、代碼結構設計

        從消息推送后臺層面上,當點擊了「啟動」按鈕時,其實是「創(chuàng)建&&啟動」或者「啟動」了個定時任務(調用xxl-job的api,將定時任務存入到xxl-job的數(shù)據庫表中)

        等我們定時任務到時間點了,xxl-job的調度中心就找到我們的執(zhí)行器,進行調用。

        在創(chuàng)建定時任務的時候,我把消息模板ID寫入到了xxl-job任務信息里,所以當任務被調度的時候,我又從xxl-job把參數(shù)信息取出來。在這,打印出了一條日志,表示當前模板ID被調度執(zhí)行了。

        隨后,我使用「線程池」對定時任務做處理。

        因為我這里認為「讀取文件以及遠程調用發(fā)送接口」是一件比較耗時的工作,所以我這里直接就用線程池做了層異步,及時返回xxl-job,避免定時任務超時。

        解釋完為什么用了線程池以后,接著來看看讀取.csv文件這塊。在最最最開始的時候,我是直接一次性讀取,然后得到List列表的。顯然,這是不合理的。假設運營圈選的人群可能達到2000W人,那我直接將2000W條記錄直接load到內存,那是不對的。

        所以,每當從文件讀取一行,我就處理一行

        我在拿到每一行數(shù)據的時候,封裝了一個VO,又扔給了內存隊列LazyPending

        內存隊列里會起一個線程消費隊列里的數(shù)據,等到積壓到給定的size或者timeout就會給到實際消費者進行處理

        我這樣設計的目的在于:我要調用批量發(fā)送接口,使用內存隊列作為介質實現(xiàn)生產者和消費者模式為了batch處理。具體來說:如果我每讀取一行就調用一次發(fā)送接口,假設人群有2000W,我就需要調用2000W次。

        在實際生產環(huán)境中,austin-cronaustin-api很大概率上是分開部署的,所以每一次調用接口都是遠程調用。為了減少這個消耗,所以我這樣干了。

        另外,在具體執(zhí)行消費的時候,我是設計了「線程池」進行接口調用的,更能充分利用系統(tǒng)資源(畢竟這次接口調用更多的是損耗網絡的開銷)

        看到了這,再回到我們的接口打印信息:

        2022-02-17?21:14:56.095?[pool-27-thread-1]?INFO??com.java3y.austin.support.utils.LogUtils?-?{"bizId":"7","bizType":"SendService#batchSend","executionTime":72,"logId":"9fd2cdd1-79fa-4c5c-932b-da08aa3b247d","msg":"{\"code\":\"send\",\"messageParamList\":[{\"extra\":null,\"receiver\":\"13719383334,13719383336,13719383338,13719383340,13719383342,13719383344\",\"variables\":{\"content\":\"xixi\",\"url\":\"hehe.com\"}},{\"extra\":null,\"receiver\":\"13719383333,13719383335,13719383337,13719383339,13719383341,13719383343,13719383345\",\"variables\":{\"content\":\"hhaha\",\"url\":\"baidu.com\"}}],\"messageTemplateId\":7}","operateDate":1645103696095,"returnStr":"{\"code\":\"0\",\"msg\":\"操作成功\"}","success":true,"tag":"operation"}

        人群數(shù)量一共是13,但我們僅用了一次接口調用。

        03、總結

        到這里,我已經把定時任務的實現(xiàn)核心邏輯應該就講完了,大家看代碼的時候應該就有個譜了,至少應該不會跑到群里說看不懂了。

        至于實現(xiàn)的細節(jié)上,如果有更好的辦法或者思路可以在評論區(qū)一起討論,又或是直接提個PR。一般我認為是合理的,我都會審核通過的喲!

        看完整篇文章,很有可能就會有同學有疑惑:你把數(shù)據放在內存隊列里,這如果重啟或系統(tǒng)掛了怎么辦啊,數(shù)據不就丟了嗎。其實我認為這是一種權衡traff-off。

        我們在系統(tǒng)里要保證數(shù)據不丟失不重復需要做大量的工作,很有可能會影響到系統(tǒng)的性能或者支持并發(fā)的大小。如果是處理訂單類的系統(tǒng),那是必須的。但如果是發(fā)消息的場景,或者并沒有想象中那么重要(當然了,我們也可以實現(xiàn)就是啦)。

        但更多的是,我們可以額外通過一些手段來判斷消息是否下發(fā)成功了:大概就是統(tǒng)計當前消息模板的下發(fā)人數(shù)、系統(tǒng)處理過程中的人數(shù)以及消息到達、點擊的人數(shù)。這是系統(tǒng)核心功能以外的,但又很重要的功能,這幾天已經在實現(xiàn)了,這周有望寫完。

        對線面試官》公眾號還在持續(xù)分享面試題,沒關注的同學可以關注一波!這是austin項目的上一個系列,質量桿桿的


        austin項目Gitee鏈接https://gitee.com/zhongfucheng/austin

        austin項目GitHub鏈接:https://github.com/ZhongFuCheng3y/austin

        閱讀原文可跳轉austin倉庫

        瀏覽 290
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            99久久精品免费看国产 | 麻豆传媒无码 | 69xxxx18一19老师hd | 成人网站免费视频 | 操逼视频的网站 | 亚洲性爱影院 | 日本日逼视频网站 | 91乱子伦国产乱子伦! | 国产三级香港三韩国三级 | 日本夫妻性生活视频 |