1. 要將RocketMQ中臺化,有點小激動

        共 8517字,需瀏覽 18分鐘

         ·

        2021-06-13 15:05


        本文字數(shù):4223

        預(yù)計閱讀時間:13分鐘


        一、RocketMQ簡介

        RocketMQ是一個高可用、高性能、高可靠的分布式消息隊列,相對于kafka更適合處理業(yè)務(wù)系統(tǒng)之間的消息。

        • 它具有很多特性,例如:

          • 發(fā)布訂閱
          • 順序、事務(wù)、定時消息
          • 消息堆積、重試,回溯等等
        • 它通過同步刷盤同步雙寫等技術(shù)手段來實現(xiàn)高可靠,保證如下情況消息不丟:

          • 可恢復(fù)性故障:broker或OS crash等
          • 不可恢復(fù)性故障:磁盤損壞等
        • 它采用多項技術(shù)優(yōu)化來滿足性能要求:

          • 順序IO
          • PageCache和mmap
          • 內(nèi)存預(yù)熱和鎖定
          • 異步提交和刷盤
          • 堆外內(nèi)存緩沖等等

        所以,它的本質(zhì)決定的其架構(gòu)一定是復(fù)雜的,參考RocketMQ官方架構(gòu)圖:

        這里不再介紹各個組件的含義,可以參考RocketMQ架構(gòu)設(shè)計。

        RocketMQ經(jīng)過阿里多年雙十一的檢驗,其穩(wěn)定性不言而喻。

        可作為搜狐視頻的消息中臺,還需要很長一段路要走,為什么這么說呢?

        二、運維之痛

        早在2014年我們就引入了RocketMQ作為消息中間件,其附帶了基本的命令行工具。

        但是命令行運維此等龐然大物會讓人感到力不從心,好在社區(qū)提供了一個web控制臺:RocketMQ-Console。

        在初期,簡單的控制臺已經(jīng)能滿足基本的需求。但是隨著各個業(yè)務(wù)逐漸接入,需求也紛至沓來。

        我們在RocketMQ-Console的修修補補已經(jīng)無法滿足了,主要體現(xiàn)在如下幾點:

        • 從業(yè)務(wù)方的角度:
          • 偏重運維,一般業(yè)務(wù)用戶不關(guān)心集群的數(shù)據(jù)和狀態(tài),無法聚焦。
          • 使用起來繁瑣,且直接操作集群,易誤操作。
          • 沒有監(jiān)控預(yù)警功能。
          • 無法滿足業(yè)務(wù)用戶的需求,包括但不限于:
            • 序列化
            • trace
            • 流控,隔離降級
            • 埋點統(tǒng)計監(jiān)控等等
          • 一些隱性問題無法解決。
        • 從管理員維度:
          • 無用戶概念,任何人都能直接操作集群,易誤操作且比較危險。
          • 無集群管理功能,日常更新或機器替換需要手動部署,非常耗時、麻煩且易出錯。
          • 無相關(guān)數(shù)據(jù)統(tǒng)計,監(jiān)控,預(yù)警等,往往有問題不能及時發(fā)現(xiàn)。

        另外,RocketMQ有一些潛在約定、使用規(guī)范、最佳實踐、bug或優(yōu)化等等,用文檔說明也無濟于事。

        所以與其寫文檔不如將經(jīng)驗和實踐轉(zhuǎn)換為產(chǎn)品,能夠更好的服務(wù)于業(yè)務(wù)及運維集群,于是MQCloud應(yīng)運而生。

        三、MQCloud誕生

        先看一下MQCloud的定位:

        它是集客戶端SDK,監(jiān)控預(yù)警,集群運維于一體的一站式服務(wù)平臺。

        MQCloud的系統(tǒng)架構(gòu)如下:

        下面來分別說明一下MQCloud如何解決上面提到的痛點。

        業(yè)務(wù)端和運維端分離,使業(yè)務(wù)用戶只聚焦于業(yè)務(wù)數(shù)據(jù)。
        為了實現(xiàn)這個目的,引入了用戶,資源兩大維度。
        針對用戶和資源加以控制,使不同的用戶只聚焦于自己的數(shù)據(jù)。
          • 對于生產(chǎn)者來說,他關(guān)心的是topic配置,消息的發(fā)送數(shù)據(jù),誰在消費等等問題,這樣只對他展示相應(yīng)的數(shù)據(jù)即可;
          • 對于消費者來說,只關(guān)心消費狀況,有沒有堆積,消費失敗等情況;
          • 對于管理員來說,可以進行部署,監(jiān)控,統(tǒng)一配置,審批等日常運維;
        清晰明了的操作
        通過對不同角色展示不同的視圖,使用戶可以進行的操作一目了然。
        規(guī)范和安全
        為了保障集群操作的安全性和規(guī)范性,所有的操作都會以申請單的形式進入后臺審批系統(tǒng),管理員來進行相關(guān)審批,安全性大大提升。
        多維的數(shù)據(jù)統(tǒng)計和監(jiān)控預(yù)警
        MQCloud核心功能之一就是監(jiān)控,要想做監(jiān)控,必須先做統(tǒng)計,為了更好的知道RocketMQ集群的運行狀況,MQCloud做了大量的統(tǒng)計工作,主要包括如下幾項:
          1. 每分鐘topic的生產(chǎn)流量:用于繪制topic生產(chǎn)流量圖及監(jiān)控預(yù)警。
          2. 每分鐘消費者流量:用于繪制消費流量圖及監(jiān)控預(yù)警。
          3. 每10分鐘topic生產(chǎn)流量:用于按照流量展示topic排序。
          4. 每分鐘broker生產(chǎn)、消費流量:用于繪制broker生產(chǎn)消費流量圖。
          5. 每分鐘broker集群生產(chǎn)、消費流量:用于繪制broker集群的生產(chǎn)流量圖。
          6. 每分鐘生產(chǎn)者百分位耗時、異常統(tǒng)計:以ip維度繪制每個生產(chǎn)者的耗時流量圖及監(jiān)控預(yù)警。
          7. 機器的cpu,內(nèi)存,io,網(wǎng)絡(luò)流量,網(wǎng)絡(luò)連接等統(tǒng)計:用于服務(wù)器的狀況圖和監(jiān)控預(yù)警。
        下面來分別介紹每項統(tǒng)計是如何收集的:
        每分鐘topic的生產(chǎn)流量
        此數(shù)據(jù)來自于RocketMQ broker端BrokerStatsManager,其提供了統(tǒng)計功能,統(tǒng)計項如下:
        1. TOPIC_PUT_NUMS:某topic消息生產(chǎn)條數(shù),向某個topic寫入消息成功才算

          寫入成功包括四種狀態(tài):PUT_OK,F(xiàn)LUSH_DISK_TIMEOUT,F(xiàn)LUSH_SLAVE_TIMEOUT,SLAVE_NOT_AVAILABLE

        2. TOPIC_PUT_SIZE:某topic消息生產(chǎn)大小,向某個topic寫入消息成功才算

        RocketMQ實現(xiàn)的統(tǒng)計邏輯較為精巧,這里做簡單描述,首先介紹幾個對象:
        1. StatsItemSet主要字段及方法如下:

          ConcurrentMap<String/* statsKey */, StatsItem> statsItemTable; // statsKey<->StatsItem
          // 針對某個數(shù)據(jù)項進行記錄
          public void addValue(final String statsKey, final int incValue, final int incTimes) {
              StatsItem statsItem = this.getAndCreateStatsItem(statsKey);
              statsItem.getValue().addAndGet(incValue);
              statsItem.getTimes().addAndGet(incTimes);
          }
          // 獲取并創(chuàng)建StatsItem
          public StatsItem getAndCreateStatsItem(final String statsKey) {
              StatsItem statsItem = this.statsItemTable.get(statsKey);
              if (null == statsItem) {
                  statsItem = new StatsItem(this.statsName, statsKey);
                  this.statsItemTable.put(statsKey, statsItem);
              }
              return statsItem;
          }

        2. StatsItem主要字段及方法如下:

          AtomicLong value; // 統(tǒng)計數(shù)據(jù):比如消息條數(shù),消息大小
          AtomicLong times; // 次數(shù)
          LinkedList<CallSnapshot> csListMinute; // 每分鐘快照數(shù)據(jù)
          LinkedList<CallSnapshot> csListHour; // 每小時快照數(shù)據(jù)
          LinkedList<CallSnapshot> csListDay; // 每天快照數(shù)據(jù)
          // 分鐘采樣
          public void samplingInSeconds() {
              synchronized (csListMinute) {
                  csListMinute.add(new CallSnapshot(System.currentTimeMillis(), times.get(), value.get()));
                  if (csListMinute.size() > 7) {
                      csListMinute.removeFirst();
                  }
              }
          }
          // 小時采樣
          public void samplingInMinutes() {
            // ...代碼省略
          }
          // 天采樣
          public void samplingInHour() {
            // ...代碼省略
          }

        3. CallSnapshot主要字段如下:

          long times; // 次數(shù)快照
          long value; // 統(tǒng)計數(shù)據(jù)快照
          long timestamp; //快照時間戳

        上面三個對象如何配合進行數(shù)據(jù)統(tǒng)計呢?舉個例子,比如統(tǒng)計topic名字為test_topic的消息生產(chǎn)大小:
        只要進行類似如下調(diào)用即可:

        StatsItemSet.addValue("test_topic", 123125123, 1)

        即表示發(fā)送了1次消息到test_topic,消息大小為123125123。
        那如何進行數(shù)據(jù)采樣呢?StatsItemSet內(nèi)置了定時任務(wù),比如其每10秒調(diào)用一次StatsItem.samplingInSeconds()。這樣StatsItem就會持有60秒的數(shù)據(jù),類似如下結(jié)構(gòu):
        那么,最后一個10秒的快照 - 第一個10秒的快照 = 當(dāng)前60秒的數(shù)據(jù),根據(jù)時間戳差值可以得到耗時。
        類似,小時數(shù)據(jù)每10分鐘進行一次快照,類似如下結(jié)構(gòu):
        天數(shù)據(jù)每1小時進行一次快照,類似如下結(jié)構(gòu):
        MQCloud每分鐘遍歷查詢集群下所有broker來查詢RocketMQ統(tǒng)計好的分鐘數(shù)據(jù),然后進行存儲。
        每分鐘消費者流量
        與每分鐘topic的生產(chǎn)流量一樣,也采用RocketMQ統(tǒng)計好的數(shù)據(jù)。
        每10分鐘topic生產(chǎn)流量
        采用數(shù)據(jù)庫已經(jīng)統(tǒng)計好的每分鐘topic流量進行累加,統(tǒng)計出10分鐘流量。
        每分鐘broker生產(chǎn)、消費流量
        由于統(tǒng)計1.每分鐘topic的生產(chǎn)流量和2.每分鐘消費者流量時是跟broker交互獲取的,所以知道broker ip,故直接按照broker維度存儲一份數(shù)據(jù)即可。
        每分鐘broker集群生產(chǎn)、消費流量
        采用4.每分鐘broker生產(chǎn)、消費流量數(shù)據(jù),按照集群求和即可。
        每分鐘生產(chǎn)者百分位耗時、異常統(tǒng)計
        由于RocketMQ并沒有提供生產(chǎn)者的流量統(tǒng)計(只提供了topic,但是并不知道每個生產(chǎn)者的情況),所以MQCloud實現(xiàn)了對生產(chǎn)者數(shù)據(jù)進行統(tǒng)計(通過RocketMQ的回調(diào)鉤子實現(xiàn)):
        主要統(tǒng)計如下信息:
          1. 客戶端ip->broker ip
          2. 發(fā)送消息耗時
          3. 消息數(shù)量
          4. 發(fā)送異常
        統(tǒng)計完成后,定時發(fā)送到MQCloud進行存儲,并做實時監(jiān)控和展示。
        關(guān)于統(tǒng)計部分有一點說明,一般耗時統(tǒng)計有最大,最小和平均值,而通常99%(即99%的請求耗時都低于此數(shù)值)的請求的耗時情況才能反映真實響應(yīng)情況。99%請求耗時統(tǒng)計最大的問題是如何控制內(nèi)存占用,因為需要對某段時間內(nèi)所有的耗時做排序后才能統(tǒng)計出這段時間的99%的耗時狀況。而對于流式數(shù)據(jù)做這樣的統(tǒng)計是有一些算法和數(shù)據(jù)結(jié)構(gòu)的,例如t-digest,但是MQCloud采用了非精確的但是較為簡單的分段統(tǒng)計的方法,具體如下:
        1. 創(chuàng)建一個按照最大耗時預(yù)哈希的時間跨度不同的耗時分段數(shù)組

          優(yōu)點:此種分段方法占用內(nèi)存是固定的,比如最大耗時如果為3500ms,那么只需要空間大小為96的數(shù)組即可

          缺點:分段精度需要提前設(shè)定好,且不可更改

          1. 第一段:耗時范圍0ms~10ms,時間跨度為1ms。

          2. 第二組:耗時范圍11ms~100ms,時間跨度5ms。

          3. 第三組:耗時范圍101ms~3500ms,時間跨度50ms。

          優(yōu)點:此種分段方法占用內(nèi)存是固定的,比如最大耗時如果為3500ms,那么只需要空間大小為96的數(shù)組即可

          缺點:分段精度需要提前設(shè)定好,且不可更

        2. 針對上面的分段數(shù)組,創(chuàng)建一個大小對應(yīng)的AtomicLong的計數(shù)數(shù)組,支持并發(fā)統(tǒng)計:

        3. 耗時統(tǒng)計時,計算耗時對應(yīng)的耗時分段數(shù)組下標(biāo),然后調(diào)用計數(shù)數(shù)組進行統(tǒng)計即可,參考下圖:

          這樣,從計數(shù)數(shù)組就可以得到實時耗時統(tǒng)計,類似如下:

          1. 例如某次耗時為18ms,首先找到它所屬的區(qū)間,即歸屬于[16~20]ms之間,對應(yīng)的數(shù)組下標(biāo)為12。
          2. 根據(jù)第一步找到的數(shù)組下標(biāo)12,獲取對應(yīng)的計數(shù)數(shù)組下標(biāo)12。
          3. 獲取對應(yīng)的計數(shù)器進行+1操作,即表示18ms發(fā)生了一次調(diào)用。

          這樣,從計數(shù)數(shù)組就可以得到實時耗時統(tǒng)計,類似如下:

        4. 然后定時采樣任務(wù)會每分鐘對計數(shù)數(shù)組進行快照,產(chǎn)生如下耗時數(shù)據(jù)

        5. 由于上面的耗時數(shù)據(jù)天然就是排好序的,可以很容易計算99%、90%、平均耗時等數(shù)據(jù)了。

        另外提一點,由于RocketMQ 4.4.0新增的trace功能也使用hook來實現(xiàn),與MQCloud的統(tǒng)計有沖突,MQCloud已經(jīng)做了兼容。
        Trace和統(tǒng)計是兩種維度,trace反映的是消息從生產(chǎn)->存儲->消費的流程,而MQCloud做的是針對生產(chǎn)者狀況的統(tǒng)計,有了這些統(tǒng)計數(shù)據(jù),才可以做到生產(chǎn)耗時情況展示,生產(chǎn)異常情況預(yù)警等功能。

        機器統(tǒng)計

        關(guān)于集群狀況收集主要采用了將nmon自動放置到/tmp目錄,定時采用ssh連接到機器執(zhí)行nmon命令,解析返回的數(shù)據(jù),然后進行存儲。

        上面這些工作就為監(jiān)控和預(yù)警奠定了堅實的數(shù)據(jù)基礎(chǔ)。

        單獨定制的客戶端

        針對客戶端的一些需求,mq-client在rocketmq-client的基礎(chǔ)上進行了開發(fā)定制:
        1. 多集群支持

          MQCloud儲存了生產(chǎn)者、消費者和集群的關(guān)系,通過路由適配,客戶端可以自動路由到目標(biāo)集群上,使客戶端對多集群透明。

        2. trace

          通過搭建單獨的trace集群和定制客戶端,使trace數(shù)據(jù)能夠發(fā)往獨立的集群,防止影響主集群。

        3. 序列化

          通過集成不同的序列化機制,配合MQCloud,客戶端無需關(guān)心序列化問題。

          目前支持的序列化為protobuf和json,并且通過類型檢測支持在線修改序列化方式。

        4. 流控

          通過提供令牌桶和漏桶限流機制,自動開啟流控機制,防止消息洪峰沖垮業(yè)務(wù)端。

        5. 隔離降級

          使用hystrix提供隔離降級策略,使業(yè)務(wù)端在broker故障時可以避免拖累。

        6. 埋點監(jiān)控

          通過對客戶端數(shù)據(jù)進行統(tǒng)計,收集,在MQCloud里進行監(jiān)控,使客戶端任何風(fēng)吹草動都能及時得知。

        7. 規(guī)范問題

          通過編碼保障,使某些約定,規(guī)范和最佳實踐得以實現(xiàn)。包括但不限于:

          • 命名規(guī)范
          • 消費組全局唯一,防止重復(fù)導(dǎo)致消費問題
          • 重試消息跳過
          • 安全關(guān)閉等等
          • 更完善的重試機制
        自動化運維
          1. 部署

            手動部署一臺broker實例沒什么問題,但是當(dāng)實例變多時,手動部署極易出錯且耗時耗力。

            MQCloud提供了一套自動化部署機制,并支持配置模板功能,支持一鍵部署。

          2. 機器運維

            MQCloud提供了一整套機器的運維機制,包括上下線,機器狀況收集、監(jiān)控、預(yù)警等等,大大提升了生產(chǎn)力。

        安全性加固

        一、開啟管理員權(quán)限

        RocketMQ從4.4.0開始支持ACL,但是默認沒有開啟,也就是任何人使用管理工具或API就可以直接操縱線上集群。但是開啟ACL對現(xiàn)有業(yè)務(wù)影響太大,針對這種情況MQCloud進行專門定制。

        借鑒RocketMQ ACL機制,只針對RocketMQ管理員操作加固權(quán)限校驗:

        并且支持自定義和熱加載管理員請求碼,使得非法操作RocketMQ集群成為不可能,安全性大大提升。


        二、broker通信加固

        broker同步數(shù)據(jù)代碼由于沒有校驗,存在安全隱患,只要連接master監(jiān)聽的slave通信端口,發(fā)送數(shù)據(jù)大于8個字節(jié),就可能導(dǎo)致同步偏移量錯誤,代碼如下:

        if ((this.byteBufferRead.position() - this.processPostion) >= 8) {
          int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
          long readOffset = this.byteBufferRead.getLong(pos - 8);
          this.processPostion = pos;
          HAConnection.this.slaveAckOffset = readOffset;
          if (HAConnection.this.slaveRequestOffset < 0) {
              HAConnection.this.slaveRequestOffset = readOffset;
              log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
          }
          HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
        }

        MQCloud通過驗證數(shù)據(jù)首包的策略,保障了通信的安全性。

        目前MQCloud運維規(guī)模如下:

        • 服務(wù)器:50臺+
        • 集群:5個+
        • topic:700個+
        • 生產(chǎn)消費消息量/日:4億條+
        • 生產(chǎn)消費消息大小/日:400G+

        MQCloud在充分考慮和吸收實際業(yè)務(wù)的需求后,以各個角色聚焦為核心,以全面監(jiān)控為目標(biāo),

        以滿足各業(yè)務(wù)端需求為己任,在不斷地發(fā)展和完善。

        在MQCloud逐漸成熟之后,秉承著服務(wù)于社區(qū)和吸收更多經(jīng)驗的理念,我們開放了源代碼。

        四、開源之路

        開放源代碼說不難也不難,說難也難。為什么這么說?

        不難就是因為代碼已經(jīng)有了,只是換個倉庫而已。

        而難點就是需要進行抽象設(shè)計,剝離不能開源的代碼(內(nèi)部模塊,代碼,地址等等)。

        經(jīng)過設(shè)計和拆分,MQCloud于18年開源了,從第一個版本release到現(xiàn)在已經(jīng)過去兩年了,

        期間隨著更新迭代大大小小一共release了20多個版本。

        其中不但包含功能更新、bug修復(fù)、wiki說明等,而且每個大版本都經(jīng)過詳細的測試和內(nèi)部的運行。

        之后很多小伙伴躍躍欲試,來試用它,并提出一些建議和意見,我們根據(jù)反饋來進一步完善它。

        我們將一直遵循我們的目標(biāo),堅定的走自己的開源之路:

        • 為業(yè)務(wù)提供可監(jiān)控,可預(yù)警,可滿足其各種需求的穩(wěn)定的MQ服務(wù)。
        • 積累MQ領(lǐng)域經(jīng)驗,將經(jīng)驗轉(zhuǎn)化為產(chǎn)品,更好的服務(wù)業(yè)務(wù)。

        后臺回復(fù) 學(xué)習(xí)資料 領(lǐng)取學(xué)習(xí)視頻


        如有收獲,點個在看,誠摯感謝

        瀏覽 40
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
          
          

            1. 国产12页 | 91黄片网站 | 涩涩电影网站 | 操逼免费小视频 | 亚洲婷婷综合网 |