1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        kafka初探

        共 7869字,需瀏覽 16分鐘

         ·

        2021-04-02 18:00

        Apache Kafka的流行歸功于它設(shè)計和操作簡單、存儲系統(tǒng)高效、充分利用磁盤順序讀寫等特性、非常適合在線日志收集等高吞吐場景。

        初識kafka集群結(jié)構(gòu):


            kafaka集群的broker和Consumer都需要連接Zookeeper進行集群配置管理,Producer 直接連接 Broker。Producer把數(shù)據(jù)上傳到Broker,Producer可以指定數(shù)據(jù)有幾個分區(qū)、幾個備份。

            上面的圖中,數(shù)據(jù)有兩個分區(qū) 0、1,每個分區(qū)都有自己的副本:0'、 1'。黃色的分區(qū)為leader,白色的為follower。

        和其他消息隊列相比,Kafka的優(yōu)勢在哪里?

        我們現(xiàn)在經(jīng)常提到 Kafka 的時候就已經(jīng)默認它是一個非常優(yōu)秀的消息隊列了,我們也會經(jīng)常拿它給 RocketMQ、RabbitMQ 對比。我覺得 Kafka 相比其他消息隊列主要的優(yōu)勢如下:

        1. 極致的性能 :基于 Scala 和 Java 語言開發(fā),設(shè)計中大量使用了批量處理和異步的思想,最高可以每秒處理千萬級別的消息。

        2. 生態(tài)系統(tǒng)兼容性無可匹敵 :Kafka 與周邊生態(tài)系統(tǒng)的兼容性是最好的沒有之一,尤其在大數(shù)據(jù)和流計算領(lǐng)域。


        kafka的消息模型:發(fā)布-訂閱模型

        主要區(qū)別傳統(tǒng)的queue模型中,消息只能被一個consumer消費的問題。

        發(fā)布訂閱模型(Pub-Sub)使用主題(Topic) 作為消息通信載體,類似于廣播模式;發(fā)布者發(fā)布一條消息,該消息通過主題傳遞給所有的訂閱者,在一條消息廣播之后才訂閱的用戶則是收不到該條消息的(不包括beging這種情況)。在發(fā)布 - 訂閱模型中,如果只有一個訂閱者,那它和隊列模型就基本是一樣的了。所以說,發(fā)布 - 訂閱模型在功能層面上是可以兼容隊列模型的。

        kafka核心元件整理

        • producer:消息生產(chǎn)者

        • consumer:消息消費者

        • broker(代理):可以看作是一個獨立的kafka實例。多個kafka broker組成一個kafka cluster。

        • topic(主題):Kafka 將生產(chǎn)者發(fā)布的消息發(fā)送到 Topic(主題) 中,需要這些消息的消費者可以訂閱這些 Topic(主題),topic就是消息的分組。

        • partition(分區(qū)):Partition屬于Topic的一部分。一個Topic可以有多個Partition ,并且同一Topic下的Partition可以分布在不同的Broker上,這也就表明一個Topic可以橫跨多個Broker 。

        Kafka 中的 Partition(分區(qū)) 實際上可以對應(yīng)成為消息隊列中的隊列queue。

        kafka多副本機制

        還有一點我覺得比較重要的是 Kafka 為分區(qū)(Partition)引入了多副本(Replica)機制。分區(qū)(Partition)中的多個副本之間會有一個叫做leader節(jié)點,其他副本稱為follower。我們發(fā)送的消息會被發(fā)送到leader副本,然后follower 副本才能從 leader副本中拉取消息進行同步。

        生產(chǎn)者和消費者只與 leader 副本交互。你可以理解為其他副本只是 leader 副本的拷貝,它們的存在只是為了保證消息存儲的安全性。當 leader 副本發(fā)生故障時會從 follower 中選舉出一個 leader,但是 follower 中如果有和 leader 同步程度達不到要求的參加不了 leader 的競選。

        多分區(qū)(Partition)以及多副本(Replica)機制有什么好處

        • Kafka 通過給特定 Topic 指定多個 Partition, 而各個 Partition 可以分布在不同的 Broker 上, 這樣便能提供比較好的并發(fā)能力(負載均衡)。

        • Partition 可以指定對應(yīng)的 Replica 數(shù), 這也極大地提高了消息存儲的安全性, 提高了容災(zāi)能力,不過也相應(yīng)的增加了所需要的存儲空間。

        kafka能保證消息的順序性嗎?

        Kafka 中 Partition(分區(qū))是真正保存消息的地方,我們發(fā)送的消息都被放在了這里。每次添加消息到 Partition(分區(qū)) 的時候都會采用尾加法,Kafka 只能為我們保證 Partition(分區(qū)) 中的消息有序,而不能保證 Topic(主題) 中的 Partition(分區(qū)) 的有序。

        消息在被追加到 Partition(分區(qū))的時候都會分配一個特定的偏移量(offset)。Kafka 通過偏移量(offset)來保證消息在分區(qū)內(nèi)的順序性。所以,我們就有一種很簡單的保證消息消費順序的方法:1 個 Topic 只對應(yīng)一個 Partition。這樣當然可以解決問題,但是破壞了 Kafka 的設(shè)計初衷。

        Kafka 中發(fā)送 1 條消息的時候,可以指定 topic, partition, key,data(數(shù)據(jù)) 4 個參數(shù)。如果在發(fā)送消息的時候指定了 Partition的話,所有消息都會被發(fā)送到指定的 Partition。并且,同一個 key 的消息可以保證只發(fā)送到同一個partition,可以采用表/對象的 id 來作為key 。

        總結(jié)一下,對于如何保證 Kafka 中消息消費的順序,有了下面兩種方法:

        • 1個Topic只對應(yīng)一個 Partition。

        • 發(fā)送消息的時候指定 key/Partition。(推薦)

        當然不僅僅只有上面兩種方法,比如如果producer單點固定,可以為每條消息綁定一個順序號等。需要根據(jù)具體場景適用。

        如何保證消息安全不丟失

        producer端消息丟失

        生產(chǎn)者(Producer) 調(diào)用send方法發(fā)送消息之后,消息可能因為網(wǎng)絡(luò)問題并沒有發(fā)送過去。

        所以,我們不能默認在調(diào)用send方法發(fā)送消息之后消息消息發(fā)送成功了。為了確定消息是發(fā)送成功,我們要判斷消息發(fā)送的結(jié)果。但是要注意的是 Kafka 生產(chǎn)者(Producer) 使用  send 方法發(fā)送消息實際上是異步的操作,我們可以通過 get()方法獲取調(diào)用結(jié)果,但是這樣也讓它變?yōu)榱送讲僮鳎?/p>

        SendResult<String, Object> sendResult = kafkaTemplate.send(topic, o).get();if (sendResult.getRecordMetadata() != null) {  logger.info("生產(chǎn)者成功發(fā)送消息到" + sendResult.getProducerRecord().topic() + "-> " + sendRe              sult.getProducerRecord().value().toString());}

        但是一般不推薦這么做!可以采用為其添加回調(diào)函數(shù)的形式,示例代碼如下:

        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, o);        future.addCallback(result -> logger.info("生產(chǎn)者成功發(fā)送消息到topic:{} partition:{}的消息", result.getRecordMetadata().topic(), result.getRecordMetadata().partition()),                ex -> logger.error("生產(chǎn)者發(fā)送消失敗,原因:{}", ex.getMessage()));

        如果消息發(fā)送失敗的話,我們檢查失敗的原因之后重新發(fā)送即可.

        另外這里推薦為 Producer 的retries (重試次數(shù))設(shè)置一個比較合理的值,一般是 3 ,但是為了保證消息不丟失的話一般會設(shè)置比較大一點。設(shè)置完成之后,當出現(xiàn)網(wǎng)絡(luò)問題之后能夠自動重試消息發(fā)送,避免消息丟失。另外,建議還要設(shè)置重試間隔,因為間隔太小的話重試的效果就不明顯了,網(wǎng)絡(luò)波動一次你3次一下子就重試完了.

        Consumer端消息丟失

        消息在被追加到 Partition(分區(qū))的時候都會分配一個特定的偏移量(offset)。偏移量(offset)表示 Consumer 當前消費到的 Partition(分區(qū))的所在的位置。Kafka 通過偏移量(offset)可以保證消息在分區(qū)內(nèi)的順序性。

        短讀風險:當消費者拉取到了分區(qū)的某個消息之后,消費者會自動提交了 offset。自動提交的話會有一個問題,試想一下,當消費者剛拿到這個消息準備進行真正消費的時候,突然掛掉了,消息實際上并沒有被消費,但是 offset 卻被自動提交了。

        長讀風險:為了解決消息少消費的問題,我們手動關(guān)閉自動提交 offset,每次在真正消費完消息之后之后再自己手動提交 offset 。 但是,這樣會帶來消息被重新消費的問題。比如你剛剛消費完消息之后,還沒提交 offset,結(jié)果自己掛掉了,那么這個消息理論上就會被消費兩次。

        集群故障導致消息丟失

        由于partition中的消息都是通過leader節(jié)點復制到follower節(jié)點的,假如 leader 副本所在的 broker 突然掛掉,那么就要從 follower 副本重新選出一個 leader ,但是 leader 的數(shù)據(jù)還有一些沒有被 follower 副本的同步的話,就會造成消息丟失。

        解決辦法就是我們設(shè)置 

        • acks = all

        acks 是 Kafka 生產(chǎn)者(Producer) 很重要的一個參數(shù)。acks 的默認值即為1,代表我們的消息被leader副本接收之后就算被成功發(fā)送。當我們配置 acks = all 代表則所有副本都要接收到該消息之后該消息才算真正成功被發(fā)送。

        • replication.factor >= 3

        為了保證 leader 副本能有 follower 副本能同步消息,我們一般會為 topic 設(shè)置 replication.factor >= 3。這樣就可以保證每個 分區(qū)(partition) 至少有 3 個副本。雖然造成了數(shù)據(jù)冗余,但是帶來了數(shù)據(jù)的安全性。

        • min.insync.replicas > 1

        一般情況下我們還需要設(shè)置 min.insync.replicas> 1 ,這樣配置代表消息至少要被寫入到 2 個副本才算是被成功發(fā)送。min.insync.replicas的默認值為 1 ,在實際生產(chǎn)中應(yīng)盡量避免默認值 1。但是,為了保證整個 Kafka 服務(wù)的高可用性,需要確保 replication.factor > min.insync.replicas 。主要是為了應(yīng)對只要是有一個副本掛掉,整個分區(qū)就無法正常工作的情況。這明顯違反高可用性!一般推薦設(shè)置成 replication.factor = min.insync.replicas + 1。

        • unclean.leader.election.enable = false

        Kafka 0.11.0.0版本開始 unclean.leader.election.enable 參數(shù)的默認值由原來的true 改為false

        我們最開始也說了我們發(fā)送的消息會被發(fā)送到 leader 副本,然后 follower 副本才能從 leader 副本中拉取消息進行同步。多個 follower副本之間的消息同步情況不一樣,當我們配置了 unclean.leader.election.enable = false 的話,當 leader 副本發(fā)生故障時就不會從 follower 副本中和 leader 同步程度達不到要求的副本中選擇出 leader ,這樣降低了消息丟失的可能性。

        總結(jié)

        生產(chǎn)端

        消費端

        集群配置

        將異步發(fā)送改為同步發(fā)送send().get()

        通過自動提交offset,存在短讀風險

        acks=all,完成所有副本的寫

        通過添加回調(diào)函數(shù)future=send()

        通過手動提交offset,存在長讀風險


        配置retries重試次數(shù)



        zookeeper在kafka中的作用

        zookeeper為kafka提供了配置元數(shù)據(jù)的管理。

        每個存儲節(jié)點的含義:

        broker注冊

        Broker是分布式部署并且相互之間相互獨立,但是需要有一個注冊系統(tǒng)能夠?qū)⒄麄€集群中的Broker管理起來,此時就使用到了Zookeeper。在Zookeeper上會有一個專門用來進行Broker服務(wù)器列表記錄的節(jié)點:

        /brokers/ids

        每個Broker在啟動時,都會到Zookeeper上進行注冊,即到/brokers/ids下創(chuàng)建屬于自己的節(jié)點,如/brokers/ids/[0...N]。Kafka使用了全局唯一的數(shù)字來指代每個Broker服務(wù)器,不同的Broker必須使用不同的Broker ID進行注冊,創(chuàng)建完節(jié)點后,每個Broker就會將自己的IP地址和端口信息記錄到該節(jié)點中去。其中Broker創(chuàng)建的節(jié)點類型是臨時節(jié)點,一旦Broker宕機,則對應(yīng)的臨時節(jié)點也會被自動刪除。

        topic注冊

        在Kafka中,同一個Topic的消息會被分成多個分區(qū)并將其分布在多個Broker上,這些分區(qū)信息及與Broker的對應(yīng)關(guān)系也都是由Zookeeper在維護,由專門的節(jié)點來記錄,如:

        /borkers/topics

        Kafka中每個Topic都會以/brokers/topics/[topic]的形式被記錄,如/brokers/topics/login和/brokers/topics/search等。Broker服務(wù)器啟動后,會到對應(yīng)Topic節(jié)點(/brokers/topics)上注冊自己的Broker ID并寫入針對該Topic的分區(qū)總數(shù),如/brokers/topics/login/3->2,這個節(jié)點表示Broker ID為3的一個Broker服務(wù)器,對于"login"這個Topic的消息,提供了2個分區(qū)進行消息存儲,同樣,這個分區(qū)節(jié)點也是臨時節(jié)點。

        負載均衡

        生產(chǎn)者負載均衡

        由于同一個Topic消息會被分區(qū)并將其分布在多個Broker上,因此,生產(chǎn)者需要將消息合理地發(fā)送到這些分布式的Broker上,那么如何實現(xiàn)生產(chǎn)者的負載均衡,Kafka支持傳統(tǒng)的四層負載均衡,也支持Zookeeper方式實現(xiàn)負載均衡。

        • 四層負載均衡

        根據(jù)生產(chǎn)者的IP地址和端口來為其確定一個相關(guān)聯(lián)的Broker。通常,一個生產(chǎn)者只會對應(yīng)單個Broker,然后該生產(chǎn)者產(chǎn)生的消息都發(fā)往該Broker。這種方式邏輯簡單,每個生產(chǎn)者不需要同其他系統(tǒng)建立額外的TCP連接,只需要和Broker維護單個TCP連接即可。但是,其無法做到真正的負載均衡,因為實際系統(tǒng)中的每個生產(chǎn)者產(chǎn)生的消息量及每個Broker的消息存儲量都是不一樣的,如果有些生產(chǎn)者產(chǎn)生的消息遠多于其他生產(chǎn)者的話,那么會導致不同的Broker接收到的消息總數(shù)差異巨大,同時,生產(chǎn)者也無法實時感知到Broker的新增和刪除。

        • 使用Zookeeper進行負載均衡

        由于每個Broker啟動時,都會完成Broker注冊過程,生產(chǎn)者會通過該節(jié)點的變化來動態(tài)地感知到Broker服務(wù)器列表的變更,這樣就可以實現(xiàn)動態(tài)的負載均衡機制。

        消費者負載均衡

        與生產(chǎn)者類似,Kafka中的消費者同樣需要進行負載均衡來實現(xiàn)多個消費者合理地從對應(yīng)的Broker服務(wù)器上接收消息,每個消費者分組包含若干消費者,每條消息都只會發(fā)送給分組中的一個消費者,不同的消費者分組消費自己特定的Topic下面的消息,互不干擾。

        記錄partition與consumer之間的分組group關(guān)系

        消費組 (Consumer Group):consumer group 下有多個 Consumer(消費者)。

        對于每個消費者組 (Consumer Group),Kafka都會為其分配一個全局唯一的Group ID,Group 內(nèi)部的所有消費者共享該 ID。訂閱的topic下的每個分區(qū)只能分配給某個 group 下的一個consumer(當然該分區(qū)還可以被分配給其他group)。同時,Kafka為每個消費者分配一個Consumer ID,通常用"Hostname:UUID"形式表示。

        在Kafka中,規(guī)定了每個消息分區(qū)只能被同組的一個消費者進行消費,因此,需要在 Zookeeper 上記錄 消息分區(qū) 與 Consumer 之間的關(guān)系,每個消費者一旦確定了對一個消息分區(qū)的消費權(quán)力,需要將其Consumer ID 寫入到 Zookeeper 對應(yīng)消息分區(qū)的臨時節(jié)點上,例如:

        /consumers/[group_id]/owners/[topic]/[broker_id-partition_id]。其中,[broker_id-partition_id]就是一個 消息分區(qū) 的標識,節(jié)點內(nèi)容就是該 消息分區(qū) 上 消費者的Consumer ID。

        記錄消息偏移量offset

        在消費者對指定消息分區(qū)進行消息消費的過程中,需要定時地將分區(qū)消息的消費進度Offset記錄到Zookeeper上,以便在該消費者進行重啟或者其他消費者重新接管該消息分區(qū)的消息消費后,能夠從之前的進度開始繼續(xù)進行消息消費。Offset在Zookeeper中由一個專門節(jié)點進行記錄,其節(jié)點路徑為:

        /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id],節(jié)點內(nèi)容就是Offset的值。

        消費者注冊信息

        消費者服務(wù)器在初始化啟動時加入消費者分組的步驟如下

        1、注冊到消費者分組。每個消費者服務(wù)器啟動時,都會到Zookeeper的指定節(jié)點下創(chuàng)建一個屬于自己的消費者節(jié)點,例如/consumers/[group_id]/ids/[consumer_id],完成節(jié)點創(chuàng)建后,消費者就會將自己訂閱的Topic信息寫入該臨時節(jié)點。

        2、對 消費者分組 中的 消費者 的變化注冊監(jiān)聽。每個 消費者 都需要關(guān)注所屬 消費者分組 中其他消費者服務(wù)器的變化情況,即對/consumers/[group_id]/ids節(jié)點注冊子節(jié)點變化的Watcher監(jiān)聽,一旦發(fā)現(xiàn)消費者新增或減少,就觸發(fā)消費者的負載均衡。

        3、對Broker服務(wù)器變化注冊監(jiān)聽。消費者需要對/broker/ids/[0-N]中的節(jié)點進行監(jiān)聽,如果發(fā)現(xiàn)Broker服務(wù)器列表發(fā)生變化,那么就根據(jù)具體情況來決定是否需要進行消費者負載均衡。

        4、進行消費者負載均衡。為了讓同一個Topic下不同分區(qū)的消息盡量均衡地被多個 消費者 消費而進行 消費者 與 消息 分區(qū)分配的過程,通常,對于一個消費者分組,如果組內(nèi)的消費者服務(wù)器發(fā)生變更或Broker服務(wù)器發(fā)生變更,會發(fā)出消費者負載均衡。



        瀏覽 71
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            97人妻精品一区二区三区软件 | 国产嗷嗷叫 | 97香蕉 | 无码精品一区二区三区四区找到 | 欧洲黄网 | 娇妻互换享受高潮 | 亚洲精品成人网站 | 日日日日做夜夜夜夜做电影 | 女人久久| 日韩精品成人电影 |