從源碼分析如何優(yōu)雅的使用 Kafka 生產(chǎn)者
本文公眾號(hào)來源:crossoverJie作者:crossoverJie本文已收錄至我的GitHub前言
其中有朋友咨詢?cè)诖罅肯⒌那闆r下 Kakfa 是如何保證消息的高效及一致性呢?
正好以這個(gè)問題結(jié)合 Kakfa 的源碼討論下如何正確、高效的發(fā)送消息。
簡單的消息發(fā)送內(nèi)容較多,對(duì)源碼感興趣的朋友請(qǐng)系好安全帶?(源碼基于
v0.10.0.0版本分析)。同時(shí)最好是有一定的 Kafka 使用經(jīng)驗(yàn),知曉基本的用法。
在分析之前先看一個(gè)簡單的消息發(fā)送是怎么樣的。
以下代碼基于 SpringBoot 構(gòu)建。
首先創(chuàng)建一個(gè) org.apache.kafka.clients.producer.Producer 的 bean。

主要關(guān)注 bootstrap.servers,它是必填參數(shù)。指的是 Kafka 集群中的 broker 地址,例如 127.0.0.1:9094。
其余幾個(gè)參數(shù)暫時(shí)不做討論,后文會(huì)有詳細(xì)介紹。
接著注入這個(gè) bean 即可調(diào)用它的發(fā)送函數(shù)發(fā)送消息。

這里我給某一個(gè) Topic 發(fā)送了 10W 條數(shù)據(jù),運(yùn)行程序消息正常發(fā)送。
但這僅僅只是做到了消息發(fā)送,對(duì)消息是否成功送達(dá)完全沒管,等于是純 異步的方式。
同步
那么我想知道消息到底發(fā)送成功沒有該怎么辦呢?
其實(shí) Producer 的 API 已經(jīng)幫我們考慮到了,發(fā)送之后只需要調(diào)用它的 get() 方法即可同步獲取發(fā)送結(jié)果。

發(fā)送結(jié)果:

這樣的發(fā)送效率其實(shí)是比較低下的,因?yàn)槊看味夹枰降却l(fā)送的結(jié)果。
異步
為此我們應(yīng)當(dāng)采取異步的方式發(fā)送,其實(shí) send() 方法默認(rèn)則是異步的,只要不手動(dòng)調(diào)用 get() 方法。
但這樣就沒法獲知發(fā)送結(jié)果。
所以查看 send() 的 API 可以發(fā)現(xiàn)還有一個(gè)參數(shù)。
Future<RecordMetadata> send(ProducerRecord<K, V> producer,Callback callback);
Callback 是一個(gè)回調(diào)接口,在消息發(fā)送完成之后可以回調(diào)我們自定義的實(shí)現(xiàn)。

執(zhí)行之后的結(jié)果:

同樣的也能獲取結(jié)果,同時(shí)發(fā)現(xiàn)回調(diào)的線程并不是上文同步時(shí)的 主線程,這樣也能證明是異步回調(diào)的。
同時(shí)回調(diào)的時(shí)候會(huì)傳遞兩個(gè)參數(shù):
RecordMetadata?和上文一致的消息發(fā)送成功后的元數(shù)據(jù)。Exception?消息發(fā)送過程中的異常信息。
但是這兩個(gè)參數(shù)并不會(huì)同時(shí)都有數(shù)據(jù),只有發(fā)送失敗才會(huì)有異常信息,同時(shí)發(fā)送元數(shù)據(jù)為空。
所以正確的寫法應(yīng)當(dāng)是:

源碼分析至于為什么會(huì)只有參數(shù)一個(gè)有值,在下文的源碼分析中會(huì)一一解釋。
現(xiàn)在只掌握了基本的消息發(fā)送,想要深刻的理解發(fā)送中的一些參數(shù)配置還是得源碼說了算。
首先還是來談?wù)勏l(fā)送時(shí)的整個(gè)流程是怎么樣的, Kafka 并不是簡單的把消息通過網(wǎng)絡(luò)發(fā)送到了 broker中,在 Java 內(nèi)部還是經(jīng)過了許多優(yōu)化和設(shè)計(jì)。
發(fā)送流程
為了直觀的了解發(fā)送的流程,簡單的畫了幾個(gè)在發(fā)送過程中關(guān)鍵的步驟。

從上至下依次是:
初始化以及真正發(fā)送消息的?
kafka-producer-network-thread?IO 線程。將消息序列化。
得到需要發(fā)送的分區(qū)。
寫入內(nèi)部的一個(gè)緩存區(qū)中。
初始化的 IO 線程不斷的消費(fèi)這個(gè)緩存來發(fā)送消息。
步驟解析
接下來詳解每個(gè)步驟。
初始化

調(diào)用該構(gòu)造方法進(jìn)行初始化時(shí),不止是簡單的將基本參數(shù)寫入 KafkaProducer。比較麻煩的是初始化 Sender 線程進(jìn)行緩沖區(qū)消費(fèi)。
初始化 IO 線程處:

可以看到 Sender 線程有需要成員變量,比如:
acks,retries,requestTimeout
等,這些參數(shù)會(huì)在后文分析。
序列化消息
在調(diào)用 send() 函數(shù)后其實(shí)第一步就是序列化,畢竟我們的消息需要通過網(wǎng)絡(luò)才能發(fā)送到 Kafka。

其中的 valueSerializer.serialize(record.topic(),record.value()); 是一個(gè)接口,我們需要在初始化時(shí)候指定序列化實(shí)現(xiàn)類。

我們也可以自己實(shí)現(xiàn)序列化,只需要實(shí)現(xiàn) org.apache.kafka.common.serialization.Serializer 接口即可。
路由分區(qū)
接下來就是路由分區(qū),通常我們使用的 Topic 為了實(shí)現(xiàn)擴(kuò)展性以及高性能都會(huì)創(chuàng)建多個(gè)分區(qū)。
如果是一個(gè)分區(qū)好說,所有消息都往里面寫入即可。
但多個(gè)分區(qū)就不可避免需要知道寫入哪個(gè)分區(qū)。
通常有三種方式。
指定分區(qū)
可以在構(gòu)建 ProducerRecord 為每條消息指定分區(qū)。

這樣在路由時(shí)會(huì)判斷是否有指定,有就直接使用該分區(qū)。

這種一般在特殊場景下會(huì)使用。
自定義路由策略

如果沒有指定分區(qū),則會(huì)調(diào)用 partitioner.partition 接口執(zhí)行自定義分區(qū)策略。
而我們也只需要自定義一個(gè)類實(shí)現(xiàn) org.apache.kafka.clients.producer.Partitioner 接口,同時(shí)在創(chuàng)建 KafkaProducer 實(shí)例時(shí)配置 partitioner.class 參數(shù)。

通常需要自定義分區(qū)一般是在想盡量的保證消息的順序性。
或者是寫入某些特有的分區(qū),由特別的消費(fèi)者來進(jìn)行處理等。
默認(rèn)策略
最后一種則是默認(rèn)的路由策略,如果我們啥都沒做就會(huì)執(zhí)行該策略。
該策略也會(huì)使得消息分配的比較均勻。
來看看它的實(shí)現(xiàn):

簡單的來說分為以下幾步:
獲取 Topic 分區(qū)數(shù)。
將內(nèi)部維護(hù)的一個(gè)線程安全計(jì)數(shù)器 +1。
與分區(qū)數(shù)取模得到分區(qū)編號(hào)。
其實(shí)這就是很典型的輪詢算法,所以只要分區(qū)數(shù)不頻繁變動(dòng)這種方式也會(huì)比較均勻。
寫入內(nèi)部緩存
在 send() 方法拿到分區(qū)后會(huì)調(diào)用一個(gè) append() 函數(shù):

該函數(shù)中會(huì)調(diào)用一個(gè) getOrCreateDeque() 寫入到一個(gè)內(nèi)部緩存中 batches。

消費(fèi)緩存
在最開始初始化的 IO 線程其實(shí)是一個(gè)守護(hù)線程,它會(huì)一直消費(fèi)這些數(shù)據(jù)。

通過圖中的幾個(gè)函數(shù)會(huì)獲取到之前寫入的數(shù)據(jù)。這塊內(nèi)容可以不必深究,但其中有個(gè) completeBatch 方法卻非常關(guān)鍵。

調(diào)用該方法時(shí)候肯定已經(jīng)是消息發(fā)送完畢了,所以會(huì)調(diào)用 batch.done() 來完成之前我們?cè)?send() 方法中定義的回調(diào)接口。

Producer 參數(shù)解析從這里也可以看出為什么之前說發(fā)送完成后元數(shù)據(jù)和異常信息只會(huì)出現(xiàn)一個(gè)。
發(fā)送流程講完了再來看看 Producer 中比較重要的幾個(gè)參數(shù)。
acks
acks 是一個(gè)影響消息吞吐量的一個(gè)關(guān)鍵參數(shù)。

主要有 [all、-1,0,1] 這幾個(gè)選項(xiàng),默認(rèn)為 1。
由于 Kafka 不是采取的主備模式,而是采用類似于 Zookeeper 的主備模式。
前提是
Topic配置副本數(shù)量replica>1。
當(dāng) acks=all/-1 時(shí):
意味著會(huì)確保所有的 follower 副本都完成數(shù)據(jù)的寫入才會(huì)返回。
這樣可以保證消息不會(huì)丟失!
但同時(shí)性能和吞吐量卻是最低的。
當(dāng) acks=0 時(shí):
producer 不會(huì)等待副本的任何響應(yīng),這樣最容易丟失消息但同時(shí)性能卻是最好的!
當(dāng) acks=1 時(shí):
這是一種折中的方案,它會(huì)等待副本 Leader 響應(yīng),但不會(huì)等到 follower 的響應(yīng)。
一旦 Leader 掛掉消息就會(huì)丟失。但性能和消息安全性都得到了一定的保證。
batch.size
這個(gè)參數(shù)看名稱就知道是內(nèi)部緩存區(qū)的大小限制,對(duì)他適當(dāng)?shù)恼{(diào)大可以提高吞吐量。
但也不能極端,調(diào)太大會(huì)浪費(fèi)內(nèi)存。小了也發(fā)揮不了作用,也是一個(gè)典型的時(shí)間和空間的權(quán)衡。


上圖是幾個(gè)使用的體現(xiàn)。
retries
retries 該參數(shù)主要是來做重試使用,當(dāng)發(fā)生一些網(wǎng)絡(luò)抖動(dòng)都會(huì)造成重試。
這個(gè)參數(shù)也就是限制重試次數(shù)。
但也有一些其他問題。
因?yàn)槭侵匕l(fā)所以消息順序可能不會(huì)一致,這也是上文提到就算是一個(gè)分區(qū)消息也不會(huì)是完全順序的情況。
還是由于網(wǎng)絡(luò)問題,本來消息已經(jīng)成功寫入了但是沒有成功響應(yīng)給 producer,進(jìn)行重試時(shí)就可能會(huì)出現(xiàn)?
消息重復(fù)。這種只能是消費(fèi)者進(jìn)行冪等處理。
如果消息量真的非常大,同時(shí)又需要盡快的將消息發(fā)送到 Kafka。一個(gè) producer 始終會(huì)收到緩存大小等影響。
那是否可以創(chuàng)建多個(gè) producer 來進(jìn)行發(fā)送呢?
配置一個(gè)最大 producer 個(gè)數(shù)。
發(fā)送消息時(shí)首先獲取一個(gè)?
producer,獲取的同時(shí)判斷是否達(dá)到最大上限,沒有就新建一個(gè)同時(shí)保存到內(nèi)部的?List中,保存時(shí)做好同步處理防止并發(fā)問題。獲取發(fā)送者時(shí)可以按照默認(rèn)的分區(qū)策略使用輪詢的方式獲?。ūWC使用均勻)。
這樣在大量、頻繁的消息發(fā)送場景中可以提高發(fā)送效率減輕單個(gè) producer 的壓力。
最后則是 Producer 的關(guān)閉,Producer 在使用過程中消耗了不少資源(線程、內(nèi)存、網(wǎng)絡(luò)等)因此需要顯式的關(guān)閉從而回收這些資源。

默認(rèn)的 close() 方法和帶有超時(shí)時(shí)間的方法都是在一定的時(shí)間后強(qiáng)制關(guān)閉。
但在過期之前都會(huì)處理完剩余的任務(wù)。
所以使用哪一個(gè)得視情況而定。
總結(jié)本文內(nèi)容較多,從實(shí)例和源碼的角度分析了 Kafka 生產(chǎn)者。
希望看完的朋友能有收獲,同時(shí)也歡迎留言討論。
不出意外下期會(huì)討論 Kafka 消費(fèi)者。
如果對(duì)你有幫助還請(qǐng)分享讓更多的人看到。
如果大家想要實(shí)時(shí)關(guān)注我更新的文章以及分享的干貨的話,可以關(guān)注我的公眾號(hào)Java3y。
?獲取海量視頻資源

獲取Java精美腦圖

?獲取Java學(xué)習(xí)路線

獲取開發(fā)常用工具

?精美整理好的PDF電子書

在公眾號(hào)下回復(fù)「888」即可獲?。?!

點(diǎn)個(gè)在看
,分享到朋友圈
,對(duì)我真的很重要?。?/strong>
