1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        Kafka 3.0新特性全面曝光,真香!

        共 23805字,需瀏覽 48分鐘

         ·

        2022-06-19 22:48

        導(dǎo)語 | kafka3.0的版本已經(jīng)試推行去zk的kafka架構(gòu)了,如果去掉了zk,那么在kafka新的版本當(dāng)中使用什么技術(shù)來代替了zk的位置呢,接下來我們一起來一探究竟,了解kafka的內(nèi)置共識機(jī)制和raft算法。


        一、Kafka簡介


        Kafka是一款開源的消息引擎系統(tǒng)。一個典型的Kafka體系架構(gòu)包括若干Producer、若干Broker、若干Consumer,以及一個ZooKeeper集群,如上圖所示。其中ZooKeeper是Kafka用來負(fù)責(zé)集群元數(shù)據(jù)的管理、控制器的選舉等操作的。Producer將消息發(fā)送到Broker,Broker負(fù)責(zé)將收到的消息存儲到磁盤中,而Consumer負(fù)責(zé)從Broker訂閱并消費(fèi)消息。


        (一)Kafka核心組件


        • producer:消息生產(chǎn)者,就是向broker發(fā)送消息的客戶端。   


        • consumer:消息消費(fèi)者,就是從broker拉取數(shù)據(jù)的客戶端。


        • consumer group:消費(fèi)者組,由多個消費(fèi)者consumer組成。消費(fèi)者組內(nèi)每個消費(fèi)者負(fù)責(zé)消費(fèi)不同的分區(qū),一個分區(qū)只能由同一個消費(fèi)者組內(nèi)的一個消費(fèi)者消費(fèi);消費(fèi)者組之間相互獨(dú)立,互不影響。所有的消費(fèi)者都屬于某個消費(fèi)者組,即消費(fèi)者組是一個邏輯上的訂閱者。


        • broker:一臺服務(wù)器就是一個broker,一個集群由多個broker組成,一個broker可以有多個topic。


        • topic可以理解為一個隊(duì)列,所有的生產(chǎn)者和消費(fèi)者都是面向topic的。


        • partition:分區(qū),kafka中的topic為了提高拓展性和實(shí)現(xiàn)高可用而將它分布到不同的broker中,一個topic可以分為多個partition,每個partition都是有序的,即消息發(fā)送到隊(duì)列的順序跟消費(fèi)時拉取到的順序是一致的。


        • replication:副本。一個topic對應(yīng)的分區(qū)partition可以有多個副本,多個副本中只有一個為leader,其余的為follower。為了保證數(shù)據(jù)的高可用性,leader和follower會盡量均勻的分布在各個broker中,避免了leader所在的服務(wù)器宕機(jī)而導(dǎo)致topic不可用的問題?! ?/span>



        (二)kafka2當(dāng)中zk的作用



        • /admin:主要保存kafka當(dāng)中的核心的重要信息,包括類似于已經(jīng)刪除的topic就會保存在這個路徑下面。


        • /brokers:主要用于保存kafka集群當(dāng)中的broker信息,以及沒被刪除的topic信息。


        • /cluster: 主要用于保存kafka集群的唯一id信息,每個kafka集群都會給分配要給唯一id,以及對應(yīng)的版本號。


        • /config: 集群配置信息。


        • /controller:kafka集群當(dāng)中的控制器信息,控制器組件(Controller),是Apache Kafka的核心組件。它的主要作用是在Apache ZooKeeper的幫助下管理和協(xié)調(diào)整個Kafka集群。


        • /controller_epoch:主要用于保存記錄controller的選舉的次數(shù)。


        • /isr_change_notification:isr列表發(fā)生變更時候的通知,在kafka當(dāng)中由于存在ISR列表變更的情況發(fā)生,為了保證ISR列表更新的及時性,定義了isr_change_notification這個節(jié)點(diǎn),主要用于通知Controller來及時將ISR列表進(jìn)行變更。


        • /latest_producer_id_block:使用`/latest_producer_id_block`節(jié)點(diǎn)來保存PID塊,主要用于能夠保證生產(chǎn)者的任意寫入請求都能夠得到響應(yīng)。


        • /log_dir_event_notification:主要用于保存當(dāng)broker當(dāng)中某些LogDir出現(xiàn)異常時候,例如磁盤損壞,文件讀寫失敗等異常時候,向ZK當(dāng)中增加一個通知序號,controller監(jiān)聽到這個節(jié)點(diǎn)的變化之后,就會做出對應(yīng)的處理操作。


        以上就是kafka在zk當(dāng)中保留的所有的所有的相關(guān)的元數(shù)據(jù)信息,這些元數(shù)據(jù)信息保證了kafka集群的正常運(yùn)行。

        二、kafka3的安裝配置


        在kafka3的版本當(dāng)中已經(jīng)徹底去掉了對zk的依賴,如果沒有了zk集群,那么kafka當(dāng)中是如何保存元數(shù)據(jù)信息的呢,這里我們通過kafka3的集群來一探究竟。


        (一)kafka安裝配置核心重要參數(shù)


        • Controller服務(wù)器


        不管是kafka2還是kafka3當(dāng)中,controller控制器都是必不可少的,通過controller控制器來維護(hù)kafka集群的正常運(yùn)行,例如ISR列表的變更,broker的上線或者下線,topic的創(chuàng)建,分區(qū)的指定等等各種操作都需要依賴于Controller,在kafka2當(dāng)中,controller的選舉需要通過zk來實(shí)現(xiàn),我們沒法控制哪些機(jī)器選舉成為Controller,而在kafka3當(dāng)中,我們可以通過配置文件來自己指定哪些機(jī)器成為Controller,這樣做的好處就是我們可以指定一些配置比較高的機(jī)器作為Controller節(jié)點(diǎn),從而保證controller節(jié)點(diǎn)的穩(wěn)健性。


        被選中的controller節(jié)點(diǎn)參與元數(shù)據(jù)集群的選舉,每個controller節(jié)點(diǎn)要么是Active狀態(tài),或者就是standBy狀態(tài)。

        • Process.Roles


        使用KRaft模式來運(yùn)行kafka集群的話,我們有一個配置叫做Process.Roles必須配置,這個參數(shù)有以下四個值可以進(jìn)行配置:


        • Process.Roles=Broker, 服務(wù)器在KRaft模式中充當(dāng)Broker。


        • Process.Roles=Controller, 服務(wù)器在KRaft模式下充當(dāng)Controller。


        • Process.Roles=Broker,Controller,服務(wù)器在KRaft模式中同時充當(dāng)Broker和Controller。


        • 如果process.roles沒有設(shè)置。那么集群就假定是運(yùn)行在ZooKeeper模式下。


        如果需要從zookeeper模式轉(zhuǎn)換成為KRaft模式,那么需要進(jìn)行重新格式化。如果一個節(jié)點(diǎn)同時是Broker和Controller節(jié)點(diǎn),那么就稱之為組合節(jié)點(diǎn)。


        實(shí)際工作當(dāng)中,如果有條件的話,盡量還是將Broker和Controller節(jié)點(diǎn)進(jìn)行分離部署。避免由于服務(wù)器資源不夠的情況導(dǎo)致OOM等一系列的問題



        • Quorum Voters


        通過controller.quorum.voters配置來實(shí)習(xí)哪些節(jié)點(diǎn)是Quorum的投票節(jié)點(diǎn),所有想要成為控制器的節(jié)點(diǎn),都必須放到這個配置里面。


        每個Broker和每個Controller都必須配置Controller.quorum.voters,該配置當(dāng)中提供的節(jié)點(diǎn)ID必須與提供給服務(wù)器的節(jié)點(diǎn)ID保持一直。


        每個Broker和每個Controller 都必須設(shè)置 controller.quorum.voters。需要注意的是,controller.quorum.voters 配置中提供的節(jié)點(diǎn)ID必須與提供給服務(wù)器的節(jié)點(diǎn)ID匹配。


        比如在Controller1上,node.Id必須設(shè)置為1,以此類推。注意,控制器id不強(qiáng)制要求你從0或1開始。然而,分配節(jié)點(diǎn)ID的最簡單和最不容易混淆的方法是給每個服務(wù)器一個數(shù)字ID,然后從0開始。



        (二)下載并解壓安裝包


        bigdata01下載kafka的安裝包,并進(jìn)行解壓:


        [hadoop@bigdata01 kraft]$ cd /opt/soft/[hadoop@bigdata01 soft]$ wget http://archive.apache.org/dist/kafka/3.1.0/kafka_2.12-3.1.0.tgz[hadoop@bigdata01 soft]$ tar -zxf kafka_2.12-3.1.0.tgz -C /opt/install/


        修改kafka的配置文件broker.properties:


        [hadoop@bigdata01 kafka_2.12-3.1.0]$ cd /opt/install/kafka_2.12-3.1.0/config/kraft/[hadoop@bigdata01 kraft]$ vim broker.properties


        修改編輯內(nèi)容如下:


        node.id=1controller.quorum.voters=1@bigdata01:9093listeners=PLAINTEXT://bigdata01:9092advertised.listeners=PLAINTEXT://bigdata01:9092log.dirs=/opt/install/kafka_2.12-3.1.0/kraftlogs


        創(chuàng)建兩個文件夾:


        [hadoop@bigdata01 kafka_2.12-3.1.0]$ mkdir -p /opt/install/kafka_2.12-3.1.0/kraftlogs[hadoop@bigdata01 kafka_2.12-3.1.0]$ mkdir -p /opt/install/kafka_2.12-3.1.0/topiclogs


        同步安裝包到其他機(jī)器上面去。



        (三)服務(wù)器集群啟動


        啟動kafka服務(wù):


        [hadoop@bigdata01 kafka_2.12-3.1.0]$  ./bin/kafka-storage.sh random-uuidYkJwr6RESgSJv-sxa1R1mA[hadoop@bigdata01 kafka_2.12-3.1.0]$  ./bin/kafka-storage.sh format -t YkJwr6RESgSJv-sxa1R1mA -c ./config/kraft/server.propertiesFormatting /opt/install/kafka_2.12-3.1.0/topiclogs[hadoop@bigdata01 kafka_2.12-3.1.0]$ ./bin/kafka-server-start.sh ./config/kraft/server.properties



        (四)創(chuàng)建kafka的topic


        集群啟動成功之后,就可以來創(chuàng)建kafka的topic了,使用以下命令來創(chuàng)建kafka的topic:


        ./bin/kafka-topics.sh --create --topic kafka_test --partitions 3 --replication-factor 2 --bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:9092



        (五)任意一臺機(jī)器查看kafka的topic


        組成集群之后,任意一臺機(jī)器就可以通過以下命令來查看到剛才創(chuàng)建的topic了:


        [hadoop@bigdata03 ~]$ cd /opt/install/kafka_2.12-3.1.0/[hadoop@bigdata03 kafka_2.12-3.1.0]$ bin/kafka-topics.sh  --list --bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:9092



        (六)消息生產(chǎn)與消費(fèi)


        使用命令行來生產(chǎn)以及消費(fèi)kafka當(dāng)中的消息:


        [hadoop@bigdata01 kafka_2.12-3.1.0]$ bin/kafka-console-producer.sh --bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:9092 --topic kafka_test
        [hadoop@bigdata02 kafka_2.12-3.1.0]$ bin/kafka-console-consumer.sh --bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:9092 --topic kafka_test --from-beginning



        三、Kafka當(dāng)中Raft的介紹


        (一)kafka強(qiáng)依賴zk所引發(fā)的問題


        前面我們已經(jīng)看到了kafka3集群在沒有zk集群的依賴下,也可以正常運(yùn)行,那么kafka2在zk當(dāng)中保存的各種重要元數(shù)據(jù)信息,在kafka3當(dāng)中如何實(shí)現(xiàn)保存的呢?


        kafka一直都是使用zk來管理集群以及所有的topic的元數(shù)據(jù),并且使用了zk的強(qiáng)一致性來選舉集群的controller,controller對整個集群的管理至關(guān)重要,包括分區(qū)的新增,ISR列表的維護(hù),等等很多功能都需要靠controller來實(shí)現(xiàn),然后使用zk來維護(hù)kafka的元數(shù)據(jù)也存在很多的問題以及存在性能瓶頸。


        以下是kafka將元數(shù)據(jù)保存在zk當(dāng)中的諸多問題。


        • 元數(shù)據(jù)存取困難


        元數(shù)據(jù)的存取過于困難,每次重新選舉的controller需要把整個集群的元數(shù)據(jù)重新restore,非常的耗時且影響集群的可用性。


        • 元數(shù)據(jù)更新網(wǎng)絡(luò)開銷大


        整個元數(shù)據(jù)的更新操作也是以全量推的方式進(jìn)行,網(wǎng)絡(luò)的開銷也會非常大。


        • 強(qiáng)耦合違背軟件設(shè)計(jì)原則


        Zookeeper對于運(yùn)維來說,維護(hù)Zookeeper也需要一定的開銷,并且kafka強(qiáng)耦合與zk也并不好,還得時刻擔(dān)心zk的宕機(jī)問題,違背軟件設(shè)計(jì)的高內(nèi)聚,低耦合的原則。


        • 網(wǎng)絡(luò)分區(qū)復(fù)雜度高


        Zookeeper本身并不能兼顧到broker與broker之間通信的狀態(tài),這就會導(dǎo)致網(wǎng)絡(luò)分區(qū)的復(fù)雜度成幾何倍數(shù)增長。


        • zk本身不適合做消息隊(duì)列


        zookeeper不適合做消息隊(duì)列,因?yàn)閦ookeeper有1M的消息大小限制 zookeeper的children太多會極大的影響性能znode太大也會影響性能 znode太大會導(dǎo)致重啟zkserver耗時10-15分鐘 zookeeper僅使用內(nèi)存作為存儲,所以不能存儲太多東西。


        • 并發(fā)訪問zk問題多


        最好單線程操作zk客戶端,不要并發(fā),臨界、競態(tài)問題太多。


        基于以上各種問題,所以提出了脫離zk的方案,轉(zhuǎn)向自助研發(fā)強(qiáng)一致性的元數(shù)據(jù)解決方案,也就是KIP-500。


        KIP-500議案提出了在Kafka中處理元數(shù)據(jù)的更好方法。基本思想是"Kafka on Kafka",將Kafka的元數(shù)據(jù)存儲在Kafka本身中,無需增加額外的外部存儲比如ZooKeeper等。


        去zookeeper之后的kafka新的架構(gòu)


        在KIP-500中,Kafka控制器會將其元數(shù)據(jù)存儲在Kafka分區(qū)中,而不是存儲在ZooKeeper中。但是,由于控制器依賴于該分區(qū),因此分區(qū)本身不能依賴控制器來進(jìn)行領(lǐng)導(dǎo)者選舉之類的事情。而是,管理該分區(qū)的節(jié)點(diǎn)必須實(shí)現(xiàn)自我管理的Raft仲裁。


        在kafka3.0的新的版本當(dāng)中,使用了新的KRaft協(xié)議,使用該協(xié)議來保證在元數(shù)據(jù)仲裁中準(zhǔn)確的復(fù)制元數(shù)據(jù),這個協(xié)議類似于zk當(dāng)中的zab協(xié)議以及類似于Raft協(xié)議,但是KRaft協(xié)議使用的是基于事件驅(qū)動的模式,與ZAB協(xié)議和Raft協(xié)議還有點(diǎn)不一樣


        在kafka3.0之前的的版本當(dāng)中,主要是借助于controller來進(jìn)行l(wèi)eader partition的選舉,而在3.0協(xié)議當(dāng)中,使用了KRaft來實(shí)現(xiàn)自己選擇leader,并最終令所有節(jié)點(diǎn)達(dá)成共識,這樣簡化了controller的選舉過程,效果更加高效。



        (二)kakfa3 Raft


        前面我們已經(jīng)知道了在kafka3當(dāng)中可以不用再依賴于zk來保存kafka當(dāng)中的元數(shù)據(jù)了,轉(zhuǎn)而使用Kafka Raft來實(shí)現(xiàn)元數(shù)據(jù)的一致性,簡稱KRaft,并且將元數(shù)據(jù)保存在kafka自己的服務(wù)器當(dāng)中,大大提高了kafka的元數(shù)據(jù)管理的性能。


        KRaft運(yùn)行模式的Kafka集群,不會將元數(shù)據(jù)存儲在Apache ZooKeeper中。即部署新集群的時候,無需部署ZooKeeper集群,因?yàn)镵afka將元數(shù)據(jù)存儲在Controller節(jié)點(diǎn)的KRaft Quorum中。KRaft可以帶來很多好處,比如可以支持更多的分區(qū),更快速的切換Controller,也可以避免Controller緩存的元數(shù)據(jù)和Zookeeper存儲的數(shù)據(jù)不一致帶來的一系列問題。


        在新的版本當(dāng)中,控制器Controller節(jié)點(diǎn)我們可以自己進(jìn)行指定,這樣最大的好處就是我們可以自己選擇一些配置比較好的機(jī)器成為Controller節(jié)點(diǎn),而不像在之前的版本當(dāng)中,我們無法指定哪臺機(jī)器成為Controller節(jié)點(diǎn),而且controller節(jié)點(diǎn)與broker節(jié)點(diǎn)可以運(yùn)行在同一臺機(jī)器上,并且控制器controller節(jié)點(diǎn)不再向broker推送更新消息,而是讓Broker從這個Controller Leader節(jié)點(diǎn)進(jìn)行拉去元數(shù)據(jù)的更新。




        (三)如何查看kafka3當(dāng)中的元數(shù)據(jù)信息


        在kafka3當(dāng)中,不再使用zk來保存元數(shù)據(jù)信息了,那么在kafka3當(dāng)中如何查看元數(shù)據(jù)信息呢,我們也可以通過kafka自帶的命令來進(jìn)行查看元數(shù)據(jù)信息,在KRaft中,有兩個命令常用命令腳本,kafka-dump-log.sh和kakfa-metadata-shell.sh需要我們來進(jìn)行關(guān)注,因?yàn)槲覀兛梢酝ㄟ^這兩個腳本來查看kafka當(dāng)中保存的元數(shù)據(jù)信息。


        • Kafka-dump-log.sh腳本來導(dǎo)出元數(shù)據(jù)信息


        KRaft模式下,所有的元數(shù)據(jù)信息都保存到了一個內(nèi)部的topic上面,叫做@metadata,例如Broker的信息,Topic的信息等,我們都可以去到這個topic上面進(jìn)行查看,我們可以通過kafka-dump-log.sh這個腳本來進(jìn)行查看該topic的信息。


        Kafka-dump-log.sh是一個之前就有的工具,用來查看Topic的的文件內(nèi)容。這工具加了一個參數(shù)--cluster-metadata-decoder用來,查看元數(shù)據(jù)日志,如下所示:


        [hadoop@bigdata01 kafka_2.12-3.1.0]$ cd /opt/install/kafka_2.12-3.1.0[hadoop@bigdata01 kafka_2.12-3.1.0]$ bin/kafka-dump-log.sh  --cluster-metadata-decoder --skip-record-metadata  --files  /opt/install/kafka_2.12-3.1.0/topiclogs/__cluster_metadata-0/00000000000000000000.index,/opt/install/kafka_2.12-3.1.0/topiclogs/__cluster_metadata-0/00000000000000000000.log  >>/opt/metadata.txt



        • kafka-metadata-shell.sh直接查看元數(shù)據(jù)信息


        平時我們用zk的時候,習(xí)慣了用zk命令行查看數(shù)據(jù),簡單快捷。bin目錄下自帶了kafka-metadata-shell.sh工具,可以允許你像zk一樣方便的查看數(shù)據(jù)。


        使用kafka-metadata-shell.sh腳本進(jìn)入kafka的元數(shù)據(jù)客戶端


        [hadoop@bigdata01 kafka_2.12-3.1.0]$ bin/kafka-metadata-shell.sh --snapshot /opt/install/kafka_2.12-3.1.0/topiclogs/__cluster_metadata-0/00000000000000000000.log


        四、Raft算法介紹


        raft算法中文版本翻譯介紹:

        https://github.com/maemual/raft-zh_cn/blob/master/raft-zh_cn.md


        著名的CAP原則又稱CAP定理的提出,真正奠基了分布式系統(tǒng)的誕生,CAP定理指的是在一個分布式系統(tǒng)中,[一致性]、[可用性](Availability)、[分區(qū)容錯性](Partition tolerance),這三個要素最多只能同時實(shí)現(xiàn)兩點(diǎn),不可能三者兼顧(nosql)。


        分布式系統(tǒng)為了提高系統(tǒng)的可靠性,一般都會選擇使用多副本的方式來進(jìn)行實(shí)現(xiàn),例如hdfs當(dāng)中數(shù)據(jù)的多副本,kafka集群當(dāng)中分區(qū)的多副本等,但是一旦有了多副本的話,那么久面臨副本之間一致性的問題,而一致性算法就是 用于解決分布式環(huán)境下多副本的數(shù)據(jù)一致性的問題。業(yè)界最著名的一致性算法就是大名鼎鼎的Paxos,但是Paxos比較晦澀難懂,不太容易理解,所以還有一種叫做Raft的算法,更加簡單容易理解的實(shí)現(xiàn)了一致性算法。


        (一)Raft協(xié)議的工作原理


        • Raft協(xié)議當(dāng)中的角色分布


        Raft協(xié)議將分布式系統(tǒng)當(dāng)中的角色分為Leader(領(lǐng)導(dǎo)者),F(xiàn)ollower(跟從者)以及Candidate(候選者)


        • Leader:主節(jié)點(diǎn)的角色,主要是接收客戶端請求,并向Follower同步日志,當(dāng)日志同步到過半及以上節(jié)點(diǎn)之后,告訴follower進(jìn)行提交日志。


        • Follower:從節(jié)點(diǎn)的角色,接受并持久化Leader同步的日志,在Leader通知可以提交日志之后,進(jìn)行提交保存的日志。


        • Candidate:Leader選舉過程中的臨時角色。


        • Raft協(xié)議當(dāng)中的底層原理


        Raft協(xié)議當(dāng)中會選舉出Leader節(jié)點(diǎn),Leader作為主節(jié)點(diǎn),完全負(fù)責(zé)replicate log的管理。Leader負(fù)責(zé)接受所有客戶端的請求,然后復(fù)制到Follower節(jié)點(diǎn),如果leader故障,那么follower會重新選舉leader,Raft協(xié)議的一致性,概括主要可以分為以下三個重要部分:


        • Leader選舉


        • 日志復(fù)制


        • 安全性


        其中Leader選舉和日志復(fù)制是Raft協(xié)議當(dāng)中最為重要的。


        Raft協(xié)議要求系統(tǒng)當(dāng)中,任意一個時刻,只有一個leader,正常工作期間,只有Leader和Follower角色,并且Raft協(xié)議采用了類似網(wǎng)絡(luò)租期的方式來進(jìn)行管理維護(hù)整個集群,Raft協(xié)議將時間分為一個個的時間段(term),也叫作任期,每一個任期都會選舉一個Leader來管理維護(hù)整個集群,如果這個時間段的Leader宕機(jī),那么這一個任期結(jié)束,繼續(xù)重新選舉leader。


        Raft算法將時間劃分成為任意不同長度的任期(term)。任期用連續(xù)的數(shù)字進(jìn)行表示。每一個任期的開始都是一次選舉(election),一個或多個候選人會試圖成為領(lǐng)導(dǎo)人。如果一個候選人贏得了選舉,它就會在該任期的剩余時間擔(dān)任領(lǐng)導(dǎo)人。在某些情況下,選票會被瓜分,有可能沒有選出領(lǐng)導(dǎo)人,那么,將會開始另一個任期,并且立刻開始下一次選舉。Raft算法保證在給定的一個任期最多只有一個領(lǐng)導(dǎo)人。

        • Leader選舉的過程


        Raft使用心跳來進(jìn)行觸發(fā)leader選舉,當(dāng)服務(wù)器啟動時,初始化為follower角色。leader向所有Follower發(fā)送周期性心跳,如果Follower在選舉超時間內(nèi)沒有收到Leader的心跳,就會認(rèn)為leader宕機(jī),稍后發(fā)起leader的選舉。


        每個Follower都會有一個倒計(jì)時時鐘,是一個隨機(jī)的值,表示的是Follower等待成為Leader的時間,倒計(jì)時時鐘先跑完,就會當(dāng)選成為Leader,這樣做得好處就是每一個節(jié)點(diǎn)都有機(jī)會成為Leader。


        當(dāng)滿足以下三個條件之一時,Quorum中的某個節(jié)點(diǎn)就會觸發(fā)選舉:


        • 向Leader發(fā)送Fetch請求后,在超時閾值quorum.fetch.timeout.ms之后仍然沒有得到Fetch響應(yīng),表示Leader疑似失敗。


        • 從當(dāng)前Leader收到了EndQuorumEpoch請求,表示Leader已退位。


        • Candidate狀態(tài)下,在超時閾值quorum.election.timeout.ms之后仍然沒有收到多數(shù)票,也沒有Candidate贏得選舉,表示此次選舉作廢,重新進(jìn)行選舉。


        具體詳細(xì)過程實(shí)現(xiàn)描述如下:


        • 增加節(jié)點(diǎn)本地的current term,切換到candidate狀態(tài)。


        • 自己給自己投一票。


        • 給其他節(jié)點(diǎn)發(fā)送RequestVote RPCs,要求其他節(jié)點(diǎn)也投自己一票。


        • 等待其他節(jié)點(diǎn)的投票回復(fù)。


        整個過程中的投票過程可以用下圖進(jìn)行表述。



        leader節(jié)點(diǎn)選舉的限制


        • 每個節(jié)點(diǎn)只能投一票,投給自己或者投給別人。


        • 候選人所知道的日志信息,一定不能比自己的更少,即能被選舉成為leader節(jié)點(diǎn),一定包含了所有已經(jīng)提交的日志。


        • 先到先得的原則


        • 數(shù)據(jù)一致性保證(日志復(fù)制機(jī)制)


        前面通過選舉機(jī)制之后,選舉出來了leader節(jié)點(diǎn),然后leader節(jié)點(diǎn)對外提供服務(wù),所有的客戶端的請求都會發(fā)送到leader節(jié)點(diǎn),由leader節(jié)點(diǎn)來調(diào)度這些并發(fā)請求的處理順序,保證所有節(jié)點(diǎn)的狀態(tài)一致,leader會把請求作為日志條目(Log entries)加入到他的日志當(dāng)中,然后并行的向其他服務(wù)器發(fā)起AppendEntries RPC復(fù)制日志條目。當(dāng)這條請求日志被成功復(fù)制到大多數(shù)服務(wù)器上面之后,Leader將這條日志應(yīng)用到它的狀態(tài)機(jī)并向客戶端返回執(zhí)行結(jié)果。


        • 客戶端的每個請求都包含被復(fù)制狀態(tài)機(jī)執(zhí)行的指令


        • leader將客戶端請求作為一條心得日志添加到日志文件中,然后并行發(fā)起RPC給其他的服務(wù)器,讓他們復(fù)制這條信息到自己的日志文件中保存。


        • 如果這條日志被成功復(fù)制,也就是大部分的follower都保存好了執(zhí)行指令日志,leader就應(yīng)用這條日志到自己的狀態(tài)機(jī)中,并返回給客戶端。


        • 如果follower宕機(jī)或者運(yùn)行緩慢或者數(shù)據(jù)丟失,leader會不斷地進(jìn)行重試,直至所有在線的follower都成功復(fù)制了所有的日志條目。


        與維護(hù)Consumer offset的方式類似,脫離ZK之后的Kafka集群將元數(shù)據(jù)視為日志,保存在一個內(nèi)置的Topic中,且該Topic只有一個Partition。



        元數(shù)據(jù)日志的消息格式與普通消息沒有太大不同,但必須攜帶Leader的紀(jì)元值(即之前的Controller epoch):


        Record => Offset LeaderEpoch ControlType Key Value Timestamp


        這樣,F(xiàn)ollower以拉模式復(fù)制Leader日志,就相當(dāng)于以Consumer角色消費(fèi)元數(shù)據(jù)Topic,符合Kafka原生的語義。


        那么在KRaft協(xié)議中,是如何維護(hù)哪些元數(shù)據(jù)日志已經(jīng)提交——即已經(jīng)成功復(fù)制到多數(shù)的Follower節(jié)點(diǎn)上的呢?Kafka仍然借用了原生副本機(jī)制中的概念——high watermark(HW,高水位線)保證日志不會丟失,HW的示意圖如下。



        狀態(tài)機(jī)說明


        要讓所有節(jié)點(diǎn)達(dá)成一致性的狀態(tài),大部分都是基于復(fù)制狀態(tài)機(jī)來實(shí)現(xiàn)的(Replicated state machine)


        簡單來說就是:初始相同的狀態(tài)+相同的輸入過程=相同的結(jié)束狀態(tài),這個其實(shí)也好理解,就類似于一對雙胞胎,出生時候就長得一樣,然后吃的喝的用的穿的都一樣,你自然很難分辨。其中最重要的就是一定要注意中間的相同輸入過程,各個不同節(jié)點(diǎn)要以相同且確定性的函數(shù)來處理輸入,而不要引入一個不確定的值。使用replicated log來實(shí)現(xiàn)每個節(jié)點(diǎn)都順序的寫入客戶端請求,然后順序的處理客戶端請求,最終就一定能夠達(dá)到最終一致性。


        狀態(tài)機(jī)安全性保證


        在安全性方面,KRaft與傳統(tǒng)Raft的選舉安全性、領(lǐng)導(dǎo)者只追加、日志匹配和領(lǐng)導(dǎo)者完全性保證都是幾乎相同的。下面只簡單看看狀態(tài)機(jī)安全性是如何保證的,仍然舉論文中的極端例子:



        • 在時刻a,節(jié)點(diǎn)S1是Leader,epoch=2的日志只復(fù)制給了S2就崩潰了。


        • 在時刻b,S5被選舉為Leader,epoch=3的日志還沒來得及復(fù)制,也崩潰了。


        • 在時刻c,S1又被選舉為Leader,繼續(xù)復(fù)制日志,將epoch=2的日志給了S3。此時該日志復(fù)制給了多數(shù)節(jié)點(diǎn),但還未提交。


        • 在時刻d,S1又崩潰,并且S5重新被選舉為領(lǐng)導(dǎo)者,將epoch=3的日志復(fù)制給S0~S4。


        此時日志與新Leader S5的日志發(fā)生了沖突,如果按上圖中d1的方式處理,消息2就會丟失。傳統(tǒng)Raft協(xié)議的處理方式是:在Leader任期開始時,立刻提交一條空的日志,所以上圖中時刻c的情況不會發(fā)生,而是如同d2一樣先提交epoch=4的日志,連帶提交epoch=2的日志。


        與傳統(tǒng)Raft不同,KRaft附加了一個較強(qiáng)的約束:當(dāng)新的Leader被選舉出來,但還沒有成功提交屬于它的epoch的日志時,不會向前推進(jìn)HW。也就是說,即使上圖中時刻c的情況發(fā)生了,消息2也被視為沒有成功提交,所以按照d1方式處理是安全的。


        日志格式說明


        所有節(jié)點(diǎn)持久化保存在本地的日志,大概就是類似于這個樣子:


        上圖顯示,共有八條日志數(shù)據(jù),其中已經(jīng)提交了7條,提交的日志都將通過狀態(tài)機(jī)持久化到本地磁盤當(dāng)中,防止宕機(jī)。


        日志復(fù)制的保證機(jī)制


        如果兩個節(jié)點(diǎn)不同的日志文件當(dāng)中存儲著相同的索引和任期號,那么他們所存儲的命令是相同的。(原因:leader最多在一個任期里的一個日志索引位置創(chuàng)建一條日志條目,日志條目所在的日志位置從來不會改變)。


        如果不同日志中兩個條目有著相同的索引和任期號,那么他們之前的所有條目都是一樣的(原因:每次RPC發(fā)送附加日志時,leader會把這條日志前面的日志下標(biāo)和任期號一起發(fā)送給follower,如果follower發(fā)現(xiàn)和自己的日志不匹配,那么就拒絕接受這條日志,這個稱之為一致性檢查)


        日志的不正常情況


        一般情況下,Leader和Followers的日志保持一致,因此Append Entries一致性檢查通常不會失敗。然而,Leader崩潰可能會導(dǎo)致日志不一致:舊的Leader可能沒有完全復(fù)制完日志中的所有條目。


        下圖闡述了一些Followers可能和新的Leader日志不同的情況。一個Follower可能會丟失掉Leader上的一些條目,也有可能包含一些Leader沒有的條目,也有可能兩者都會發(fā)生。丟失的或者多出來的條目可能會持續(xù)多個任期。



        如何保證日志的正常復(fù)制


        如果出現(xiàn)了上述leader宕機(jī),導(dǎo)致follower與leader日志不一致的情況,那么就需要進(jìn)行處理,保證follower上的日志與leader上的日志保持一致,leader通過強(qiáng)制follower復(fù)制它的日志來處理不一致的問題,follower與leader不一致的日志會被強(qiáng)制覆蓋。leader為了最大程度的保證日志的一致性,且保證日志最大量,leader會尋找follower與他日志一致的地方,然后覆蓋follower之后的所有日志條目,從而實(shí)現(xiàn)日志數(shù)據(jù)的一致性。


        具體的操作就是:leader會從后往前不斷對比,每次Append Entries失敗后嘗試前一個日志條目,直到成功找到每個Follower的日志一致的位置點(diǎn),然后向該Follower所在位置之后的條目進(jìn)行覆蓋。


        詳細(xì)過程如下:


        • Leader維護(hù)了每個Follower節(jié)點(diǎn)下一次要接收的日志的索引,即nextIndex。


        • Leader選舉成功后將所有Follower的nextIndex設(shè)置為自己的最后一個日志條目+1。


        • Leader將數(shù)據(jù)推送給Follower,如果Follower驗(yàn)證失?。╪extIndex不匹配),則在下一次推送日志時縮小nextIndex,直到nextIndex驗(yàn)證通過。


        總結(jié)一下就是:當(dāng)leader和follower日志沖突的時候,leader將校驗(yàn) follower最后一條日志是否和leader匹配,如果不匹配,將遞減查詢,直到匹配,匹配后,刪除沖突的日志。這樣就實(shí)現(xiàn)了主從日志的一致性。



        (二)Raft協(xié)議算法代碼實(shí)現(xiàn)


        前面我們已經(jīng)大致了解了Raft協(xié)議算法的實(shí)現(xiàn)原理,如果我們要自己實(shí)現(xiàn)一個Raft協(xié)議的算法,其實(shí)就是將我們講到的理論知識給翻譯成為代碼的過程,具體的開發(fā)需要考慮的細(xì)節(jié)比較多,代碼量肯定也比較大,好在有人已經(jīng)實(shí)現(xiàn)了Raft協(xié)議的算法了,我們可以直接拿過來使用。


        創(chuàng)建maven工程并導(dǎo)入jar包地址如下:


        <dependencies>
        <dependency> <groupId>com.github.wenweihu86.raft</groupId> <artifactId>raft-java-core</artifactId> <version>1.8.0</version> </dependency>
        <dependency> <groupId>com.github.wenweihu86.rpc</groupId> <artifactId>rpc-java</artifactId> <version>1.8.0</version> </dependency>
        <dependency> <groupId>org.rocksdb</groupId> <artifactId>rocksdbjni</artifactId> <version>5.1.4</version> </dependency>
        </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build>


        定義Server端代碼實(shí)現(xiàn):


        public class Server1 {    public static void main(String[] args) {        // parse args        // peers, format is "host:port:serverId,host2:port2:serverId2"
        //localhost:16010:1,localhost:16020:2,localhost:16030:3 localhost:16010:1 String servers = "localhost:16010:1,localhost:16020:2,localhost:16030:3";
        // local server RaftMessage.Server localServer = parseServer("localhost:16010:1");
        String[] splitArray = servers.split(","); List<RaftMessage.Server> serverList = new ArrayList<>(); for (String serverString : splitArray) { RaftMessage.Server server = parseServer(serverString); serverList.add(server); }

        // 初始化RPCServer RPCServer server = new RPCServer(localServer.getEndPoint().getPort()); // 設(shè)置Raft選項(xiàng),比如: // just for test snapshot RaftOptions raftOptions = new RaftOptions(); /* raftOptions.setSnapshotMinLogSize(10 * 1024); raftOptions.setSnapshotPeriodSeconds(30); raftOptions.setMaxSegmentFileSize(1024 * 1024);*/ // 應(yīng)用狀態(tài)機(jī) ExampleStateMachine stateMachine = new ExampleStateMachine(raftOptions.getDataDir()); // 初始化RaftNode RaftNode raftNode = new RaftNode(raftOptions, serverList, localServer, stateMachine); raftNode.getLeaderId(); // 注冊Raft節(jié)點(diǎn)之間相互調(diào)用的服務(wù) RaftConsensusService raftConsensusService = new RaftConsensusServiceImpl(raftNode); server.registerService(raftConsensusService); // 注冊給Client調(diào)用的Raft服務(wù) RaftClientService raftClientService = new RaftClientServiceImpl(raftNode); server.registerService(raftClientService); // 注冊應(yīng)用自己提供的服務(wù) ExampleService exampleService = new ExampleServiceImpl(raftNode, stateMachine); server.registerService(exampleService); // 啟動RPCServer,初始化Raft節(jié)點(diǎn) server.start(); raftNode.init(); }
        private static RaftMessage.Server parseServer(String serverString) { String[] splitServer = serverString.split(":"); String host = splitServer[0]; Integer port = Integer.parseInt(splitServer[1]); Integer serverId = Integer.parseInt(splitServer[2]); RaftMessage.EndPoint endPoint = RaftMessage.EndPoint.newBuilder() .setHost(host).setPort(port).build(); RaftMessage.Server.Builder serverBuilder = RaftMessage.Server.newBuilder(); RaftMessage.Server server = serverBuilder.setServerId(serverId).setEndPoint(endPoint).build(); return server; }}


        定義客戶端代碼實(shí)現(xiàn)如下:


        public class ClientMain {    public static void main(String[] args) {        // parse args        String ipPorts = args[0];        String key = args[1];        String value = null;        if (args.length > 2) {            value = args[2];        }        // init rpc client        RPCClient rpcClient = new RPCClient(ipPorts);        ExampleService exampleService = RPCProxy.getProxy(rpcClient, ExampleService.class);        final JsonFormat.Printer printer = JsonFormat.printer().omittingInsignificantWhitespace();        // set        if (value != null) {            ExampleMessage.SetRequest setRequest = ExampleMessage.SetRequest.newBuilder()                    .setKey(key).setValue(value).build();            ExampleMessage.SetResponse setResponse = exampleService.set(setRequest);            try {                System.out.printf("set request, key=%s value=%s response=%s\n",                        key, value, printer.print(setResponse));            } catch (Exception ex) {                ex.printStackTrace();            }        } else {            // get            ExampleMessage.GetRequest getRequest = ExampleMessage.GetRequest.newBuilder().setKey(key).build();            ExampleMessage.GetResponse getResponse = exampleService.get(getRequest);            try {                String value1 = getResponse.getValue();                System.out.println(value1);                System.out.printf("get request, key=%s, response=%s\n",                        key, printer.print(getResponse));            } catch (Exception ex) {                ex.printStackTrace();            }        }
        rpcClient.stop(); }}


        先啟動服務(wù)端,然后啟動客戶端,就可以將實(shí)現(xiàn)客戶端向服務(wù)端發(fā)送消息,并且服務(wù)端會向三臺機(jī)器進(jìn)行保存消息了。


        五、Kafka常見問題


        (一)消息隊(duì)列模型知道嗎?Kafka是怎么做到支持這兩種模型的?


        對于傳統(tǒng)的消息隊(duì)列系統(tǒng)支持兩個模型:


        • 點(diǎn)對點(diǎn):也就是消息只能被一個消費(fèi)者消費(fèi),消費(fèi)完后消息刪除。


        • 發(fā)布訂閱:相當(dāng)于廣播模式,消息可以被所有消費(fèi)者消費(fèi)。


        kafka其實(shí)就是通過Consumer Group同時支持了這兩個模型。如果說所有消費(fèi)者都屬于一個Group,消息只能被同一個Group內(nèi)的一個消費(fèi)者消費(fèi),那就是點(diǎn)對點(diǎn)模式。如果每個消費(fèi)者都是一個單獨(dú)的Group,那么就是發(fā)布訂閱模式。



        (二)說說Kafka通信過程原理嗎?


        首先kafka broker啟動的時候,會去向Zookeeper注冊自己的ID(創(chuàng)建臨時節(jié)點(diǎn)),這個ID可以配置也可以自動生成,同時會去訂閱Zookeeper的brokers/ids路徑,當(dāng)有新的broker加入或者退出時,可以得到當(dāng)前所有broker信。


        生產(chǎn)者啟動的時候會指定bootstrap.servers,通過指定的broker地址,Kafka就會和這些broker創(chuàng)建TCP連接(通常我們不用配置所有的broker服務(wù)器地址,否則kafka會和配置的所有broker都建立TCP連接)


        隨便連接到任何一臺broker之后,然后再發(fā)送請求獲取元數(shù)據(jù)信息(包含有哪些主題、主題都有哪些分區(qū)、分區(qū)有哪些副本,分區(qū)的Leader副本等信息)


        接著就會創(chuàng)建和所有broker的TCP連接。


        之后就是發(fā)送消息的過程。


        消費(fèi)者和生產(chǎn)者一樣,也會指定bootstrap.servers屬性,然后選擇一臺broker創(chuàng)建TCP連接,發(fā)送請求找到協(xié)調(diào)者所在的broker。


        然后再和協(xié)調(diào)者broker創(chuàng)建TCP連接,獲取元數(shù)據(jù)。


        根據(jù)分區(qū)Leader節(jié)點(diǎn)所在的broker節(jié)點(diǎn),和這些broker分別創(chuàng)建連接。


        最后開始消費(fèi)消息。



        (三)發(fā)送消息時如何選擇分區(qū)的?


        主要有兩種方式:


        • 輪詢,按照順序消息依次發(fā)送到不同的分區(qū)。


        • 隨機(jī),隨機(jī)發(fā)送到某個分區(qū)。


        如果消息指定key,那么會根據(jù)消息的key進(jìn)行hash,然后對partition分區(qū)數(shù)量取模,決定落在哪個分區(qū)上,所以,對于相同key的消息來說,總是會發(fā)送到同一個分區(qū)上,也是我們常說的消息分區(qū)有序性。


        很常見的場景就是我們希望下單、支付消息有順序,這樣以訂單ID作為key發(fā)送消息就達(dá)到了分區(qū)有序性的目的。


        如果沒有指定key,會執(zhí)行默認(rèn)的輪詢負(fù)載均衡策略,比如第一條消息落在P0,第二條消息落在P1,然后第三條又在P1。


        除此之外,對于一些特定的業(yè)務(wù)場景和需求,還可以通過實(shí)現(xiàn)Partitioner接口,重寫configure和partition方法來達(dá)到自定義分區(qū)的效果。



        (四)為什么需要分區(qū)?有什么好處?


        這個問題很簡單,如果說不分區(qū)的話,我們發(fā)消息寫數(shù)據(jù)都只能保存到一個節(jié)點(diǎn)上,這樣的話就算這個服務(wù)器節(jié)點(diǎn)性能再好最終也支撐不住。

        實(shí)際上分布式系統(tǒng)都面臨這個問題,要么收到消息之后進(jìn)行數(shù)據(jù)切分,要么提前切分,kafka正是選擇了前者,通過分區(qū)可以把數(shù)據(jù)均勻地分布到不同的節(jié)點(diǎn)。


        分區(qū)帶來了負(fù)載均衡和橫向擴(kuò)展的能力。


        發(fā)送消息時可以根據(jù)分區(qū)的數(shù)量落在不同的Kafka服務(wù)器節(jié)點(diǎn)上,提升了并發(fā)寫消息的性能,消費(fèi)消息的時候又和消費(fèi)者綁定了關(guān)系,可以從不同節(jié)點(diǎn)的不同分區(qū)消費(fèi)消息,提高了讀消息的能力。


        另外一個就是分區(qū)又引入了副本,冗余的副本保證了Kafka的高可用和高持久性。



        (五)詳細(xì)說說消費(fèi)者組和消費(fèi)者重平衡?


        Kafka中的消費(fèi)者組訂閱topic主題的消息,一般來說消費(fèi)者的數(shù)量最好要和所有主題分區(qū)的數(shù)量保持一致最好(舉例子用一個主題,實(shí)際上當(dāng)然是可以訂閱多個主題)。


        當(dāng)消費(fèi)者數(shù)量小于分區(qū)數(shù)量的時候,那么必然會有一個消費(fèi)者消費(fèi)多個分區(qū)的消息。


        而消費(fèi)者數(shù)量超過分區(qū)的數(shù)量的時候,那么必然會有消費(fèi)者沒有分區(qū)可以消費(fèi)。


        所以,消費(fèi)者組的好處一方面在上面說到過,可以支持多種消息模型,另外的話根據(jù)消費(fèi)者和分區(qū)的消費(fèi)關(guān)系,支撐橫向擴(kuò)容伸縮。



        當(dāng)我們知道消費(fèi)者如何消費(fèi)分區(qū)的時候,就顯然會有一個問題出現(xiàn)了,消費(fèi)者消費(fèi)的分區(qū)是怎么分配的,有先加入的消費(fèi)者時候怎么辦?


        舊版本的重平衡過程主要通過ZK監(jiān)聽器的方式來觸發(fā),每個消費(fèi)者客戶端自己去執(zhí)行分區(qū)分配算法。


        新版本則是通過協(xié)調(diào)者來完成,每一次新的消費(fèi)者加入都會發(fā)送請求給協(xié)調(diào)者去獲取分區(qū)的分配,這個分區(qū)分配的算法邏輯由協(xié)調(diào)者來完成。


        而重平衡Rebalance就是指的有新消費(fèi)者加入的情況,比如剛開始我們只有消費(fèi)者A在消費(fèi)消息,過了一段時間消費(fèi)者B和C加入了,這時候分區(qū)就需要重新分配,這就是重平衡,也可以叫做再平衡,但是重平衡的過程和我們的GC時候STW很像,會導(dǎo)致整個消費(fèi)群組停止工作,重平衡期間都無法消息消息。


        另外,發(fā)生重平衡并不是只有這一種情況,因?yàn)橄M(fèi)者和分區(qū)總數(shù)是存在綁定關(guān)系的,上面也說了,消費(fèi)者數(shù)量最好和所有主題的分區(qū)總數(shù)一樣。


        那只要消費(fèi)者數(shù)量、主題數(shù)量(比如用的正則訂閱的主題)、分區(qū)數(shù)量任何一個發(fā)生改變,都會觸發(fā)重平衡。


        下面說說重平衡的過程。


        重平衡的機(jī)制依賴消費(fèi)者和協(xié)調(diào)者之間的心跳來維持,消費(fèi)者會有一個獨(dú)立的線程去定時發(fā)送心跳給協(xié)調(diào)者,這個可以通過參數(shù)heartbeat.interval.ms來控制發(fā)送心跳的間隔時間。


        每個消費(fèi)者第一次加入組的時候都會向協(xié)調(diào)者發(fā)送JoinGroup請求,第一個發(fā)送這個請求的消費(fèi)者會成為“群主”,協(xié)調(diào)者會返回組成員列表給群主。


        群主執(zhí)行分區(qū)分配策略,然后把分配結(jié)果通過SyncGroup請求發(fā)送給協(xié)調(diào)者,協(xié)調(diào)者收到分區(qū)分配結(jié)果。


        其他組內(nèi)成員也向協(xié)調(diào)者發(fā)送SyncGroup,協(xié)調(diào)者把每個消費(fèi)者的分區(qū)分配分別響應(yīng)給他們。



        (六)具體講講分區(qū)分配策略?


        主要有3種分配策略


        Range


        對分區(qū)進(jìn)行排序,排序越靠前的分區(qū)能夠分配到更多的分區(qū)。


        比如有3個分區(qū),消費(fèi)者A排序更靠前,所以能夠分配到P0\P1兩個分區(qū),消費(fèi)者B就只能分配到一個P2。


        如果是4個分區(qū)的話,那么他們會剛好都是分配到2個。



        但是這個分配策略會有點(diǎn)小問題,他是根據(jù)主題進(jìn)行分配,所以如果消費(fèi)者組訂閱了多個主題,那就有可能導(dǎo)致分區(qū)分配不均衡。


        比如下圖中兩個主題的P0\P1都被分配給了A,這樣A有4個分區(qū),而B只有2個,如果這樣的主題數(shù)量越多,那么不均衡就越嚴(yán)重。



        RoundRobin


        也就是我們常說的輪詢了,這個就比較簡單了,不畫圖你也能很容易理解。

        這個會根據(jù)所有的主題進(jìn)行輪詢分配,不會出現(xiàn)Range那種主題越多可能導(dǎo)致分區(qū)分配不均衡的問題。


        P0->A,P1->B,P1->A。。。以此類推



        Sticky


        這個從字面看來意思就是粘性策略,大概是這個意思。主要考慮的是在分配均衡的前提下,讓分區(qū)的分配更小的改動。


        比如之前P0\P1分配給消費(fèi)者A,那么下一次盡量還是分配給A。


        這樣的好處就是連接可以復(fù)用,要消費(fèi)消息總是要和broker去連接的,如果能夠保持上一次分配的分區(qū)的話,那么就不用頻繁的銷毀創(chuàng)建連接了。



        (七)如何保證消息可靠性?


        • 生產(chǎn)者發(fā)送消息丟失


        kafka支持3種方式發(fā)送消息,這也是常規(guī)的3種方式,發(fā)送后不管結(jié)果、同步發(fā)送、異步發(fā)送,基本上所有的消息隊(duì)列都是這樣玩的。


        • 發(fā)送并忘記,直接調(diào)用發(fā)送send方法,不管結(jié)果,雖然可以開啟自動重試,但是肯定會有消息丟失的可能。


        • 同步發(fā)送,同步發(fā)送返回Future對象,我們可以知道發(fā)送結(jié)果,然后進(jìn)行處理。


        • 異步發(fā)送,發(fā)送消息,同時指定一個回調(diào)函數(shù),根據(jù)結(jié)果進(jìn)行相應(yīng)的處理。


        為了保險起見,一般我們都會使用異步發(fā)送帶有回調(diào)的方式進(jìn)行發(fā)送消息,再設(shè)置參數(shù)為發(fā)送消息失敗不停地重試。


        acks=all,這個參數(shù)有可以配置0|1|all。


        0表示生產(chǎn)者寫入消息不管服務(wù)器的響應(yīng),可能消息還在網(wǎng)絡(luò)緩沖區(qū),服務(wù)器根本沒有收到消息,當(dāng)然會丟失消息。


        1表示至少有一個副本收到消息才認(rèn)為成功,一個副本那肯定就是集群的Leader副本了,但是如果剛好Leader副本所在的節(jié)點(diǎn)掛了,F(xiàn)ollower沒有同步這條消息,消息仍然丟失了。


        配置all的話表示所有ISR都寫入成功才算成功,那除非所有ISR里的副本全掛了,消息才會丟失。


        retries=N,設(shè)置一個非常大的值,可以讓生產(chǎn)者發(fā)送消息失敗后不停重試

        Kafka 自身消息丟失。


        kafka因?yàn)橄懭胧峭ㄟ^PageCache異步寫入磁盤的,因此仍然存在丟失消息的可能。


        因此針對kafka自身丟失的可能設(shè)置參數(shù):


        replication.factor=N,設(shè)置一個比較大的值,保證至少有2個或者以上的副本。


        min.insync.replicas=N,代表消息如何才能被認(rèn)為是寫入成功,設(shè)置大于1的數(shù),保證至少寫入1個或者以上的副本才算寫入消息成功。


        unclean.leader.election.enable=false,這個設(shè)置意味著沒有完全同步的分區(qū)副本不能成為Leader副本,如果是true的話,那些沒有完全同步Leader的副本成為Leader之后,就會有消息丟失的風(fēng)險。


        • 消費(fèi)者消息丟失


        消費(fèi)者丟失的可能就比較簡單,關(guān)閉自動提交位移即可,改為業(yè)務(wù)處理成功手動提交。


        因?yàn)橹仄胶獍l(fā)生的時候,消費(fèi)者會去讀取上一次提交的偏移量,自動提交默認(rèn)是每5秒一次,這會導(dǎo)致重復(fù)消費(fèi)或者丟失消息。


        enable.auto.commit=false,設(shè)置為手動提交。


        還有一個參數(shù)我們可能也需要考慮進(jìn)去的:


        auto.offset.reset=earliest,這個參數(shù)代表沒有偏移量可以提交或者broker上不存在偏移量的時候,消費(fèi)者如何處理。earliest代表從分區(qū)的開始位置讀取,可能會重復(fù)讀取消息,但是不會丟失,消費(fèi)方一般我們肯定要自己保證冪等,另外一種latest表示從分區(qū)末尾讀取,那就會有概率丟失消息。

        綜合這幾個參數(shù)設(shè)置,我們就能保證消息不會丟失,保證了可靠性。



        (八)聊聊副本和它的同步原理吧?


        Kafka副本的之前提到過,分為Leader副本和Follower副本,也就是主副本和從副本,和其他的比如Mysql不一樣的是,Kafka中只有Leader副本會對外提供服務(wù),F(xiàn)ollower副本只是單純地和Leader保持?jǐn)?shù)據(jù)同步,作為數(shù)據(jù)冗余容災(zāi)的作用。


        在Kafka中我們把所有副本的集合統(tǒng)稱為AR(Assigned Replicas),和Leader副本保持同步的副本集合稱為ISR(InSyncReplicas)。


        ISR是一個動態(tài)的集合,維持這個集合會通過replica.lag.time.max.ms參數(shù)來控制,這個代表落后Leader副本的最長時間,默認(rèn)值10秒,所以只要Follower副本沒有落后Leader副本超過10秒以上,就可以認(rèn)為是和Leader同步的(簡單可以認(rèn)為就是同步時間差)。


        另外還有兩個關(guān)鍵的概念用于副本之間的同步:


        HW(High Watermark):高水位,也叫做復(fù)制點(diǎn),表示副本間同步的位置。如下圖所示,0~4綠色表示已經(jīng)提交的消息,這些消息已經(jīng)在副本之間進(jìn)行同步,消費(fèi)者可以看見這些消息并且進(jìn)行消費(fèi),4~6黃色的則是表示未提交的消息,可能還沒有在副本間同步,這些消息對于消費(fèi)者是不可見的。


        LEO(Log End Offset):下一條待寫入消息的位移



        副本間同步的過程依賴的就是HW和LEO的更新,以他們的值變化來演示副本同步消息的過程,綠色表示Leader副本,黃色表示Follower副本。


        首先,生產(chǎn)者不停地向Leader寫入數(shù)據(jù),這時候Leader的LEO可能已經(jīng)達(dá)到了10,但是HW依然是0,兩個Follower向Leader請求同步數(shù)據(jù),他們的值都是0。



        此時,F(xiàn)ollower再次向Leader拉取數(shù)據(jù),這時候Leader會更新自己的HW值,取Follower中的最小的LEO值來更新。



        之后,Leader響應(yīng)自己的HW給Follower,F(xiàn)ollower更新自己的HW值,因?yàn)橛掷〉搅讼?,所以再次更新LEO,流程以此類推。



        (九)Kafka為什么快?


        主要是3個方面:


        • 順序IO


        kafka寫消息到分區(qū)采用追加的方式,也就是順序?qū)懭氪疟P,不是隨機(jī)寫入,這個速度比普通的隨機(jī)IO快非常多,幾乎可以和網(wǎng)絡(luò)IO的速度相媲美。


        • Page Cache和零拷貝


        kafka在寫入消息數(shù)據(jù)的時候通過mmap內(nèi)存映射的方式,不是真正立刻寫入磁盤,而是利用操作系統(tǒng)的文件緩存PageCache異步寫入,提高了寫入消息的性能,另外在消費(fèi)消息的時候又通過sendfile實(shí)現(xiàn)了零拷貝。


        • 批量處理和壓縮


        Kafka在發(fā)送消息的時候不是一條條的發(fā)送的,而是會把多條消息合并成一個批次進(jìn)行處理發(fā)送,消費(fèi)消息也是一個道理,一次拉取一批次的消息進(jìn)行消費(fèi)。


        并且Producer、Broker、Consumer都使用了優(yōu)化后的壓縮算法,發(fā)送和消息消息使用壓縮節(jié)省了網(wǎng)絡(luò)傳輸?shù)拈_銷,Broker存儲使用壓縮則降低了磁盤存儲的空間。


        參考資料:

        1.《深入理解Kafka:核心設(shè)計(jì)實(shí)踐原理》

        2.狀態(tài)機(jī)程序設(shè)計(jì)套路

        3.raft算法源碼

        4.https://www.bbsmax.com/A/QW5Y3kaBzm/

        瀏覽 51
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            99精品久久毛片A片 | 情趣视频网站在线观看 | 欧美精品无码一区二区三区仓井松 | 懂色av懂色av粉嫩av无码 | AV天天插 | 蓝色七导航曰韩中文勉费满看 | 人和动物毛片 | 国产主播自拍网址在线观看免费 | 性生交裸片免费观看 | 免费网站观看www在线观看 |