kafka初探
Apache Kafka的流行歸功于它設(shè)計和操作簡單、存儲系統(tǒng)高效、充分利用磁盤順序讀寫等特性、非常適合在線日志收集等高吞吐場景。
初識kafka集群結(jié)構(gòu):

kafaka集群的broker和Consumer都需要連接Zookeeper進行集群配置管理,Producer 直接連接 Broker。Producer把數(shù)據(jù)上傳到Broker,Producer可以指定數(shù)據(jù)有幾個分區(qū)、幾個備份。
上面的圖中,數(shù)據(jù)有兩個分區(qū) 0、1,每個分區(qū)都有自己的副本:0'、 1'。黃色的分區(qū)為leader,白色的為follower。
和其他消息隊列相比,Kafka的優(yōu)勢在哪里?
我們現(xiàn)在經(jīng)常提到 Kafka 的時候就已經(jīng)默認它是一個非常優(yōu)秀的消息隊列了,我們也會經(jīng)常拿它給 RocketMQ、RabbitMQ 對比。我覺得 Kafka 相比其他消息隊列主要的優(yōu)勢如下:
極致的性能 :基于 Scala 和 Java 語言開發(fā),設(shè)計中大量使用了批量處理和異步的思想,最高可以每秒處理千萬級別的消息。
生態(tài)系統(tǒng)兼容性無可匹敵 :Kafka 與周邊生態(tài)系統(tǒng)的兼容性是最好的沒有之一,尤其在大數(shù)據(jù)和流計算領(lǐng)域。
kafka的消息模型:發(fā)布-訂閱模型
主要區(qū)別傳統(tǒng)的queue模型中,消息只能被一個consumer消費的問題。

發(fā)布訂閱模型(Pub-Sub)使用主題(Topic) 作為消息通信載體,類似于廣播模式;發(fā)布者發(fā)布一條消息,該消息通過主題傳遞給所有的訂閱者,在一條消息廣播之后才訂閱的用戶則是收不到該條消息的(不包括beging這種情況)。在發(fā)布 - 訂閱模型中,如果只有一個訂閱者,那它和隊列模型就基本是一樣的了。所以說,發(fā)布 - 訂閱模型在功能層面上是可以兼容隊列模型的。
kafka核心元件整理

producer:消息生產(chǎn)者
consumer:消息消費者
broker(代理):可以看作是一個獨立的kafka實例。多個kafka broker組成一個kafka cluster。
topic(主題):Kafka 將生產(chǎn)者發(fā)布的消息發(fā)送到 Topic(主題) 中,需要這些消息的消費者可以訂閱這些 Topic(主題),topic就是消息的分組。
partition(分區(qū)):Partition屬于Topic的一部分。一個Topic可以有多個Partition ,并且同一Topic下的Partition可以分布在不同的Broker上,這也就表明一個Topic可以橫跨多個Broker 。
Kafka 中的 Partition(分區(qū)) 實際上可以對應(yīng)成為消息隊列中的隊列queue。
kafka多副本機制
還有一點我覺得比較重要的是 Kafka 為分區(qū)(Partition)引入了多副本(Replica)機制。分區(qū)(Partition)中的多個副本之間會有一個叫做leader節(jié)點,其他副本稱為follower。我們發(fā)送的消息會被發(fā)送到leader副本,然后follower 副本才能從 leader副本中拉取消息進行同步。
生產(chǎn)者和消費者只與 leader 副本交互。你可以理解為其他副本只是 leader 副本的拷貝,它們的存在只是為了保證消息存儲的安全性。當 leader 副本發(fā)生故障時會從 follower 中選舉出一個 leader,但是 follower 中如果有和 leader 同步程度達不到要求的參加不了 leader 的競選。

多分區(qū)(Partition)以及多副本(Replica)機制有什么好處
Kafka 通過給特定 Topic 指定多個 Partition, 而各個 Partition 可以分布在不同的 Broker 上, 這樣便能提供比較好的并發(fā)能力(負載均衡)。
Partition 可以指定對應(yīng)的 Replica 數(shù), 這也極大地提高了消息存儲的安全性, 提高了容災(zāi)能力,不過也相應(yīng)的增加了所需要的存儲空間。
kafka能保證消息的順序性嗎?
Kafka 中 Partition(分區(qū))是真正保存消息的地方,我們發(fā)送的消息都被放在了這里。每次添加消息到 Partition(分區(qū)) 的時候都會采用尾加法,Kafka 只能為我們保證 Partition(分區(qū)) 中的消息有序,而不能保證 Topic(主題) 中的 Partition(分區(qū)) 的有序。

消息在被追加到 Partition(分區(qū))的時候都會分配一個特定的偏移量(offset)。Kafka 通過偏移量(offset)來保證消息在分區(qū)內(nèi)的順序性。所以,我們就有一種很簡單的保證消息消費順序的方法:1 個 Topic 只對應(yīng)一個 Partition。這樣當然可以解決問題,但是破壞了 Kafka 的設(shè)計初衷。
Kafka 中發(fā)送 1 條消息的時候,可以指定 topic, partition, key,data(數(shù)據(jù)) 4 個參數(shù)。如果在發(fā)送消息的時候指定了 Partition的話,所有消息都會被發(fā)送到指定的 Partition。并且,同一個 key 的消息可以保證只發(fā)送到同一個partition,可以采用表/對象的 id 來作為key 。
總結(jié)一下,對于如何保證 Kafka 中消息消費的順序,有了下面兩種方法:
1個Topic只對應(yīng)一個 Partition。
發(fā)送消息的時候指定 key/Partition。(推薦)
當然不僅僅只有上面兩種方法,比如如果producer單點固定,可以為每條消息綁定一個順序號等。需要根據(jù)具體場景適用。
如何保證消息安全不丟失
producer端消息丟失
生產(chǎn)者(Producer) 調(diào)用send方法發(fā)送消息之后,消息可能因為網(wǎng)絡(luò)問題并沒有發(fā)送過去。
所以,我們不能默認在調(diào)用send方法發(fā)送消息之后消息消息發(fā)送成功了。為了確定消息是發(fā)送成功,我們要判斷消息發(fā)送的結(jié)果。但是要注意的是 Kafka 生產(chǎn)者(Producer) 使用 send 方法發(fā)送消息實際上是異步的操作,我們可以通過 get()方法獲取調(diào)用結(jié)果,但是這樣也讓它變?yōu)榱送讲僮鳎?/p>SendResult<String, Object> sendResult = kafkaTemplate.send(topic, o).get();if (sendResult.getRecordMetadata() != null) { logger.info("生產(chǎn)者成功發(fā)送消息到" + sendResult.getProducerRecord().topic() + "-> " + sendRe sult.getProducerRecord().value().toString());}
但是一般不推薦這么做!可以采用為其添加回調(diào)函數(shù)的形式,示例代碼如下:
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, o);future.addCallback(result -> logger.info("生產(chǎn)者成功發(fā)送消息到topic:{} partition:{}的消息", result.getRecordMetadata().topic(), result.getRecordMetadata().partition()),ex -> logger.error("生產(chǎn)者發(fā)送消失敗,原因:{}", ex.getMessage()));
如果消息發(fā)送失敗的話,我們檢查失敗的原因之后重新發(fā)送即可.
另外這里推薦為 Producer 的retries (重試次數(shù))設(shè)置一個比較合理的值,一般是 3 ,但是為了保證消息不丟失的話一般會設(shè)置比較大一點。設(shè)置完成之后,當出現(xiàn)網(wǎng)絡(luò)問題之后能夠自動重試消息發(fā)送,避免消息丟失。另外,建議還要設(shè)置重試間隔,因為間隔太小的話重試的效果就不明顯了,網(wǎng)絡(luò)波動一次你3次一下子就重試完了.
Consumer端消息丟失
消息在被追加到 Partition(分區(qū))的時候都會分配一個特定的偏移量(offset)。偏移量(offset)表示 Consumer 當前消費到的 Partition(分區(qū))的所在的位置。Kafka 通過偏移量(offset)可以保證消息在分區(qū)內(nèi)的順序性。

短讀風險:當消費者拉取到了分區(qū)的某個消息之后,消費者會自動提交了 offset。自動提交的話會有一個問題,試想一下,當消費者剛拿到這個消息準備進行真正消費的時候,突然掛掉了,消息實際上并沒有被消費,但是 offset 卻被自動提交了。
長讀風險:為了解決消息少消費的問題,我們手動關(guān)閉自動提交 offset,每次在真正消費完消息之后之后再自己手動提交 offset 。 但是,這樣會帶來消息被重新消費的問題。比如你剛剛消費完消息之后,還沒提交 offset,結(jié)果自己掛掉了,那么這個消息理論上就會被消費兩次。
集群故障導致消息丟失
由于partition中的消息都是通過leader節(jié)點復制到follower節(jié)點的,假如 leader 副本所在的 broker 突然掛掉,那么就要從 follower 副本重新選出一個 leader ,但是 leader 的數(shù)據(jù)還有一些沒有被 follower 副本的同步的話,就會造成消息丟失。
解決辦法就是我們設(shè)置
acks = all
acks 是 Kafka 生產(chǎn)者(Producer) 很重要的一個參數(shù)。acks 的默認值即為1,代表我們的消息被leader副本接收之后就算被成功發(fā)送。當我們配置 acks = all 代表則所有副本都要接收到該消息之后該消息才算真正成功被發(fā)送。
replication.factor >= 3
為了保證 leader 副本能有 follower 副本能同步消息,我們一般會為 topic 設(shè)置 replication.factor >= 3。這樣就可以保證每個 分區(qū)(partition) 至少有 3 個副本。雖然造成了數(shù)據(jù)冗余,但是帶來了數(shù)據(jù)的安全性。
min.insync.replicas > 1
一般情況下我們還需要設(shè)置 min.insync.replicas> 1 ,這樣配置代表消息至少要被寫入到 2 個副本才算是被成功發(fā)送。min.insync.replicas的默認值為 1 ,在實際生產(chǎn)中應(yīng)盡量避免默認值 1。但是,為了保證整個 Kafka 服務(wù)的高可用性,需要確保 replication.factor > min.insync.replicas 。主要是為了應(yīng)對只要是有一個副本掛掉,整個分區(qū)就無法正常工作的情況。這明顯違反高可用性!一般推薦設(shè)置成 replication.factor = min.insync.replicas + 1。
unclean.leader.election.enable = false
Kafka 0.11.0.0版本開始 unclean.leader.election.enable 參數(shù)的默認值由原來的true 改為false
我們最開始也說了我們發(fā)送的消息會被發(fā)送到 leader 副本,然后 follower 副本才能從 leader 副本中拉取消息進行同步。多個 follower副本之間的消息同步情況不一樣,當我們配置了 unclean.leader.election.enable = false 的話,當 leader 副本發(fā)生故障時就不會從 follower 副本中和 leader 同步程度達不到要求的副本中選擇出 leader ,這樣降低了消息丟失的可能性。
總結(jié)
生產(chǎn)端 | 消費端 | 集群配置 |
將異步發(fā)送改為同步發(fā)送send().get() | 通過自動提交offset,存在短讀風險 | acks=all,完成所有副本的寫 |
通過添加回調(diào)函數(shù)future=send() | 通過手動提交offset,存在長讀風險 | |
配置retries重試次數(shù) |
zookeeper在kafka中的作用
zookeeper為kafka提供了配置元數(shù)據(jù)的管理。

每個存儲節(jié)點的含義:

broker注冊
Broker是分布式部署并且相互之間相互獨立,但是需要有一個注冊系統(tǒng)能夠?qū)⒄麄€集群中的Broker管理起來,此時就使用到了Zookeeper。在Zookeeper上會有一個專門用來進行Broker服務(wù)器列表記錄的節(jié)點:
/brokers/ids
每個Broker在啟動時,都會到Zookeeper上進行注冊,即到/brokers/ids下創(chuàng)建屬于自己的節(jié)點,如/brokers/ids/[0...N]。Kafka使用了全局唯一的數(shù)字來指代每個Broker服務(wù)器,不同的Broker必須使用不同的Broker ID進行注冊,創(chuàng)建完節(jié)點后,每個Broker就會將自己的IP地址和端口信息記錄到該節(jié)點中去。其中Broker創(chuàng)建的節(jié)點類型是臨時節(jié)點,一旦Broker宕機,則對應(yīng)的臨時節(jié)點也會被自動刪除。
topic注冊
在Kafka中,同一個Topic的消息會被分成多個分區(qū)并將其分布在多個Broker上,這些分區(qū)信息及與Broker的對應(yīng)關(guān)系也都是由Zookeeper在維護,由專門的節(jié)點來記錄,如:
/borkers/topics
Kafka中每個Topic都會以/brokers/topics/[topic]的形式被記錄,如/brokers/topics/login和/brokers/topics/search等。Broker服務(wù)器啟動后,會到對應(yīng)Topic節(jié)點(/brokers/topics)上注冊自己的Broker ID并寫入針對該Topic的分區(qū)總數(shù),如/brokers/topics/login/3->2,這個節(jié)點表示Broker ID為3的一個Broker服務(wù)器,對于"login"這個Topic的消息,提供了2個分區(qū)進行消息存儲,同樣,這個分區(qū)節(jié)點也是臨時節(jié)點。
負載均衡
生產(chǎn)者負載均衡
由于同一個Topic消息會被分區(qū)并將其分布在多個Broker上,因此,生產(chǎn)者需要將消息合理地發(fā)送到這些分布式的Broker上,那么如何實現(xiàn)生產(chǎn)者的負載均衡,Kafka支持傳統(tǒng)的四層負載均衡,也支持Zookeeper方式實現(xiàn)負載均衡。
四層負載均衡
根據(jù)生產(chǎn)者的IP地址和端口來為其確定一個相關(guān)聯(lián)的Broker。通常,一個生產(chǎn)者只會對應(yīng)單個Broker,然后該生產(chǎn)者產(chǎn)生的消息都發(fā)往該Broker。這種方式邏輯簡單,每個生產(chǎn)者不需要同其他系統(tǒng)建立額外的TCP連接,只需要和Broker維護單個TCP連接即可。但是,其無法做到真正的負載均衡,因為實際系統(tǒng)中的每個生產(chǎn)者產(chǎn)生的消息量及每個Broker的消息存儲量都是不一樣的,如果有些生產(chǎn)者產(chǎn)生的消息遠多于其他生產(chǎn)者的話,那么會導致不同的Broker接收到的消息總數(shù)差異巨大,同時,生產(chǎn)者也無法實時感知到Broker的新增和刪除。
使用Zookeeper進行負載均衡
由于每個Broker啟動時,都會完成Broker注冊過程,生產(chǎn)者會通過該節(jié)點的變化來動態(tài)地感知到Broker服務(wù)器列表的變更,這樣就可以實現(xiàn)動態(tài)的負載均衡機制。
消費者負載均衡
與生產(chǎn)者類似,Kafka中的消費者同樣需要進行負載均衡來實現(xiàn)多個消費者合理地從對應(yīng)的Broker服務(wù)器上接收消息,每個消費者分組包含若干消費者,每條消息都只會發(fā)送給分組中的一個消費者,不同的消費者分組消費自己特定的Topic下面的消息,互不干擾。
記錄partition與consumer之間的分組group關(guān)系
消費組 (Consumer Group):consumer group 下有多個 Consumer(消費者)。
對于每個消費者組 (Consumer Group),Kafka都會為其分配一個全局唯一的Group ID,Group 內(nèi)部的所有消費者共享該 ID。訂閱的topic下的每個分區(qū)只能分配給某個 group 下的一個consumer(當然該分區(qū)還可以被分配給其他group)。同時,Kafka為每個消費者分配一個Consumer ID,通常用"Hostname:UUID"形式表示。
在Kafka中,規(guī)定了每個消息分區(qū)只能被同組的一個消費者進行消費,因此,需要在 Zookeeper 上記錄 消息分區(qū) 與 Consumer 之間的關(guān)系,每個消費者一旦確定了對一個消息分區(qū)的消費權(quán)力,需要將其Consumer ID 寫入到 Zookeeper 對應(yīng)消息分區(qū)的臨時節(jié)點上,例如:
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]。其中,[broker_id-partition_id]就是一個 消息分區(qū) 的標識,節(jié)點內(nèi)容就是該 消息分區(qū) 上 消費者的Consumer ID。
記錄消息偏移量offset
在消費者對指定消息分區(qū)進行消息消費的過程中,需要定時地將分區(qū)消息的消費進度Offset記錄到Zookeeper上,以便在該消費者進行重啟或者其他消費者重新接管該消息分區(qū)的消息消費后,能夠從之前的進度開始繼續(xù)進行消息消費。Offset在Zookeeper中由一個專門節(jié)點進行記錄,其節(jié)點路徑為:
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id],節(jié)點內(nèi)容就是Offset的值。
消費者注冊信息
消費者服務(wù)器在初始化啟動時加入消費者分組的步驟如下
1、注冊到消費者分組。每個消費者服務(wù)器啟動時,都會到Zookeeper的指定節(jié)點下創(chuàng)建一個屬于自己的消費者節(jié)點,例如/consumers/[group_id]/ids/[consumer_id],完成節(jié)點創(chuàng)建后,消費者就會將自己訂閱的Topic信息寫入該臨時節(jié)點。
2、對 消費者分組 中的 消費者 的變化注冊監(jiān)聽。每個 消費者 都需要關(guān)注所屬 消費者分組 中其他消費者服務(wù)器的變化情況,即對/consumers/[group_id]/ids節(jié)點注冊子節(jié)點變化的Watcher監(jiān)聽,一旦發(fā)現(xiàn)消費者新增或減少,就觸發(fā)消費者的負載均衡。
3、對Broker服務(wù)器變化注冊監(jiān)聽。消費者需要對/broker/ids/[0-N]中的節(jié)點進行監(jiān)聽,如果發(fā)現(xiàn)Broker服務(wù)器列表發(fā)生變化,那么就根據(jù)具體情況來決定是否需要進行消費者負載均衡。
4、進行消費者負載均衡。為了讓同一個Topic下不同分區(qū)的消息盡量均衡地被多個 消費者 消費而進行 消費者 與 消息 分區(qū)分配的過程,通常,對于一個消費者分組,如果組內(nèi)的消費者服務(wù)器發(fā)生變更或Broker服務(wù)器發(fā)生變更,會發(fā)出消費者負載均衡。
