1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        從源碼分析如何優(yōu)雅的使用 Kafka 生產(chǎn)者

        共 3822字,需瀏覽 8分鐘

         ·

        2020-02-06 23:22


        本文公眾號(hào)來源:crossoverJie作者:crossoverJie本文已收錄至我的GitHub前言

        其中有朋友咨詢?cè)诖罅肯⒌那闆r下 Kakfa 是如何保證消息的高效及一致性呢?

        正好以這個(gè)問題結(jié)合 Kakfa 的源碼討論下如何正確、高效的發(fā)送消息。

        內(nèi)容較多,對(duì)源碼感興趣的朋友請(qǐng)系好安全帶?(源碼基于 v0.10.0.0 版本分析)。同時(shí)最好是有一定的 Kafka 使用經(jīng)驗(yàn),知曉基本的用法。

        簡單的消息發(fā)送

        在分析之前先看一個(gè)簡單的消息發(fā)送是怎么樣的。

        以下代碼基于 SpringBoot 構(gòu)建。

        首先創(chuàng)建一個(gè) org.apache.kafka.clients.producer.Producer 的 bean。

        a6b73442d0c46b46126980fb82c843da.webp

        主要關(guān)注 bootstrap.servers,它是必填參數(shù)。指的是 Kafka 集群中的 broker 地址,例如 127.0.0.1:9094

        其余幾個(gè)參數(shù)暫時(shí)不做討論,后文會(huì)有詳細(xì)介紹。

        接著注入這個(gè) bean 即可調(diào)用它的發(fā)送函數(shù)發(fā)送消息。

        e69e4c6ff47f4ee8af31d980b0fa5200.webp

        這里我給某一個(gè) Topic 發(fā)送了 10W 條數(shù)據(jù),運(yùn)行程序消息正常發(fā)送。

        但這僅僅只是做到了消息發(fā)送,對(duì)消息是否成功送達(dá)完全沒管,等于是純 異步的方式。

        同步

        那么我想知道消息到底發(fā)送成功沒有該怎么辦呢?

        其實(shí) ProducerAPI 已經(jīng)幫我們考慮到了,發(fā)送之后只需要調(diào)用它的 get() 方法即可同步獲取發(fā)送結(jié)果。

        7e6c87fdf4de0cc5129f048d8a7c9056.webp

        發(fā)送結(jié)果:

        32db1a8aa0797f39afd3e33d0656a3fd.webp

        這樣的發(fā)送效率其實(shí)是比較低下的,因?yàn)槊看味夹枰降却l(fā)送的結(jié)果。

        異步

        為此我們應(yīng)當(dāng)采取異步的方式發(fā)送,其實(shí) send() 方法默認(rèn)則是異步的,只要不手動(dòng)調(diào)用 get() 方法。

        但這樣就沒法獲知發(fā)送結(jié)果。

        所以查看 send() 的 API 可以發(fā)現(xiàn)還有一個(gè)參數(shù)。

        1. Future<RecordMetadata> send(ProducerRecord<K, V> producer,Callback callback);

        Callback 是一個(gè)回調(diào)接口,在消息發(fā)送完成之后可以回調(diào)我們自定義的實(shí)現(xiàn)。

        0232d68685260f6c8f1fd295945b07c2.webp

        執(zhí)行之后的結(jié)果:

        3e10c6f9dd3fdb821f08b7b308d6d770.webp

        同樣的也能獲取結(jié)果,同時(shí)發(fā)現(xiàn)回調(diào)的線程并不是上文同步時(shí)的 主線程,這樣也能證明是異步回調(diào)的。

        同時(shí)回調(diào)的時(shí)候會(huì)傳遞兩個(gè)參數(shù):

        • RecordMetadata?和上文一致的消息發(fā)送成功后的元數(shù)據(jù)。

        • Exception?消息發(fā)送過程中的異常信息。

        但是這兩個(gè)參數(shù)并不會(huì)同時(shí)都有數(shù)據(jù),只有發(fā)送失敗才會(huì)有異常信息,同時(shí)發(fā)送元數(shù)據(jù)為空。

        所以正確的寫法應(yīng)當(dāng)是:

        33dd1a4ef54746590f51ddfb5c4caa89.webp

        至于為什么會(huì)只有參數(shù)一個(gè)有值,在下文的源碼分析中會(huì)一一解釋。

        源碼分析

        現(xiàn)在只掌握了基本的消息發(fā)送,想要深刻的理解發(fā)送中的一些參數(shù)配置還是得源碼說了算。

        首先還是來談?wù)勏l(fā)送時(shí)的整個(gè)流程是怎么樣的, Kafka 并不是簡單的把消息通過網(wǎng)絡(luò)發(fā)送到了 broker中,在 Java 內(nèi)部還是經(jīng)過了許多優(yōu)化和設(shè)計(jì)。

        發(fā)送流程

        為了直觀的了解發(fā)送的流程,簡單的畫了幾個(gè)在發(fā)送過程中關(guān)鍵的步驟。

        813c14abc55d92c5d9750eaf31d31efe.webp

        從上至下依次是:

        • 初始化以及真正發(fā)送消息的?kafka-producer-network-thread?IO 線程。

        • 將消息序列化。

        • 得到需要發(fā)送的分區(qū)。

        • 寫入內(nèi)部的一個(gè)緩存區(qū)中。

        • 初始化的 IO 線程不斷的消費(fèi)這個(gè)緩存來發(fā)送消息。

        步驟解析

        接下來詳解每個(gè)步驟。

        初始化

        174b2bea8223613232fee6c61db10f45.webp

        調(diào)用該構(gòu)造方法進(jìn)行初始化時(shí),不止是簡單的將基本參數(shù)寫入 KafkaProducer。比較麻煩的是初始化 Sender 線程進(jìn)行緩沖區(qū)消費(fèi)。

        初始化 IO 線程處:

        59fb64dc04585bef2f04b8fb11587b86.webp

        可以看到 Sender 線程有需要成員變量,比如:

        1. acks,retries,requestTimeout

        等,這些參數(shù)會(huì)在后文分析。

        序列化消息

        在調(diào)用 send() 函數(shù)后其實(shí)第一步就是序列化,畢竟我們的消息需要通過網(wǎng)絡(luò)才能發(fā)送到 Kafka。

        53a902f688d59df7b90521151ebefce9.webp

        其中的 valueSerializer.serialize(record.topic(),record.value()); 是一個(gè)接口,我們需要在初始化時(shí)候指定序列化實(shí)現(xiàn)類。

        f4ff287c7e29a14c31ebe27826422431.webp

        我們也可以自己實(shí)現(xiàn)序列化,只需要實(shí)現(xiàn) org.apache.kafka.common.serialization.Serializer 接口即可。

        路由分區(qū)

        接下來就是路由分區(qū),通常我們使用的 Topic 為了實(shí)現(xiàn)擴(kuò)展性以及高性能都會(huì)創(chuàng)建多個(gè)分區(qū)。

        如果是一個(gè)分區(qū)好說,所有消息都往里面寫入即可。

        但多個(gè)分區(qū)就不可避免需要知道寫入哪個(gè)分區(qū)。

        通常有三種方式。

        指定分區(qū)

        可以在構(gòu)建 ProducerRecord 為每條消息指定分區(qū)。

        35b2f66b5a3240e4f3b5c5dbba3125b6.webp

        這樣在路由時(shí)會(huì)判斷是否有指定,有就直接使用該分區(qū)。

        71dc8e1f9a27f243764570496636182b.webp

        這種一般在特殊場景下會(huì)使用。

        自定義路由策略

        1c45398e418aa29c0067446a29892971.webp

        如果沒有指定分區(qū),則會(huì)調(diào)用 partitioner.partition 接口執(zhí)行自定義分區(qū)策略。

        而我們也只需要自定義一個(gè)類實(shí)現(xiàn) org.apache.kafka.clients.producer.Partitioner 接口,同時(shí)在創(chuàng)建 KafkaProducer 實(shí)例時(shí)配置 partitioner.class 參數(shù)。

        c28d3f5853b6ad5eb943bf07d8dcd4d6.webp

        通常需要自定義分區(qū)一般是在想盡量的保證消息的順序性。

        或者是寫入某些特有的分區(qū),由特別的消費(fèi)者來進(jìn)行處理等。

        默認(rèn)策略

        最后一種則是默認(rèn)的路由策略,如果我們啥都沒做就會(huì)執(zhí)行該策略。

        該策略也會(huì)使得消息分配的比較均勻。

        來看看它的實(shí)現(xiàn):

        7c611bfdf0bcb726a902f62e9e8144f9.webp

        簡單的來說分為以下幾步:

        • 獲取 Topic 分區(qū)數(shù)。

        • 將內(nèi)部維護(hù)的一個(gè)線程安全計(jì)數(shù)器 +1。

        • 與分區(qū)數(shù)取模得到分區(qū)編號(hào)。

        其實(shí)這就是很典型的輪詢算法,所以只要分區(qū)數(shù)不頻繁變動(dòng)這種方式也會(huì)比較均勻。

        寫入內(nèi)部緩存

        send() 方法拿到分區(qū)后會(huì)調(diào)用一個(gè) append() 函數(shù):

        a47409f8150e464bd8fab06b204a8e85.webp

        該函數(shù)中會(huì)調(diào)用一個(gè) getOrCreateDeque() 寫入到一個(gè)內(nèi)部緩存中 batches。

        8411a572eaa86930f19eef5ea0e5c5a7.webp

        消費(fèi)緩存

        在最開始初始化的 IO 線程其實(shí)是一個(gè)守護(hù)線程,它會(huì)一直消費(fèi)這些數(shù)據(jù)。

        5b0a27942b9432e793f74c0dac46e96c.webp

        通過圖中的幾個(gè)函數(shù)會(huì)獲取到之前寫入的數(shù)據(jù)。這塊內(nèi)容可以不必深究,但其中有個(gè) completeBatch 方法卻非常關(guān)鍵。

        b0b39685fdb0941e9c4799aa1e26a549.webp

        調(diào)用該方法時(shí)候肯定已經(jīng)是消息發(fā)送完畢了,所以會(huì)調(diào)用 batch.done() 來完成之前我們?cè)?send() 方法中定義的回調(diào)接口。

        ef5c72059b6fe29ebeff6fac3afb80f2.webp

        從這里也可以看出為什么之前說發(fā)送完成后元數(shù)據(jù)和異常信息只會(huì)出現(xiàn)一個(gè)。

        Producer 參數(shù)解析

        發(fā)送流程講完了再來看看 Producer 中比較重要的幾個(gè)參數(shù)。

        acks

        acks 是一個(gè)影響消息吞吐量的一個(gè)關(guān)鍵參數(shù)。

        1375a39fd1e34d64dca6cde93c1913ed.webp

        主要有 [all、-1,0,1] 這幾個(gè)選項(xiàng),默認(rèn)為 1。

        由于 Kafka 不是采取的主備模式,而是采用類似于 Zookeeper 的主備模式。

        前提是 Topic 配置副本數(shù)量 replica>1

        當(dāng) acks=all/-1 時(shí):

        意味著會(huì)確保所有的 follower 副本都完成數(shù)據(jù)的寫入才會(huì)返回。

        這樣可以保證消息不會(huì)丟失!

        但同時(shí)性能和吞吐量卻是最低的。

        當(dāng) acks=0 時(shí):

        producer 不會(huì)等待副本的任何響應(yīng),這樣最容易丟失消息但同時(shí)性能卻是最好的!

        當(dāng) acks=1 時(shí):

        這是一種折中的方案,它會(huì)等待副本 Leader 響應(yīng),但不會(huì)等到 follower 的響應(yīng)。

        一旦 Leader 掛掉消息就會(huì)丟失。但性能和消息安全性都得到了一定的保證。

        batch.size

        這個(gè)參數(shù)看名稱就知道是內(nèi)部緩存區(qū)的大小限制,對(duì)他適當(dāng)?shù)恼{(diào)大可以提高吞吐量。

        但也不能極端,調(diào)太大會(huì)浪費(fèi)內(nèi)存。小了也發(fā)揮不了作用,也是一個(gè)典型的時(shí)間和空間的權(quán)衡。

        b68d76099b86830925a3aaebeb4d6998.webp

        34788b14dd3c4ff9195760a2d4710668.webp

        上圖是幾個(gè)使用的體現(xiàn)。

        retries

        retries 該參數(shù)主要是來做重試使用,當(dāng)發(fā)生一些網(wǎng)絡(luò)抖動(dòng)都會(huì)造成重試。

        這個(gè)參數(shù)也就是限制重試次數(shù)。

        但也有一些其他問題。

        • 因?yàn)槭侵匕l(fā)所以消息順序可能不會(huì)一致,這也是上文提到就算是一個(gè)分區(qū)消息也不會(huì)是完全順序的情況。

        • 還是由于網(wǎng)絡(luò)問題,本來消息已經(jīng)成功寫入了但是沒有成功響應(yīng)給 producer,進(jìn)行重試時(shí)就可能會(huì)出現(xiàn)?消息重復(fù)。這種只能是消費(fèi)者進(jìn)行冪等處理。

        高效的發(fā)送方式

        如果消息量真的非常大,同時(shí)又需要盡快的將消息發(fā)送到 Kafka。一個(gè) producer 始終會(huì)收到緩存大小等影響。

        那是否可以創(chuàng)建多個(gè) producer 來進(jìn)行發(fā)送呢?

        • 配置一個(gè)最大 producer 個(gè)數(shù)。

        • 發(fā)送消息時(shí)首先獲取一個(gè)?producer,獲取的同時(shí)判斷是否達(dá)到最大上限,沒有就新建一個(gè)同時(shí)保存到內(nèi)部的?List中,保存時(shí)做好同步處理防止并發(fā)問題。

        • 獲取發(fā)送者時(shí)可以按照默認(rèn)的分區(qū)策略使用輪詢的方式獲?。ūWC使用均勻)。

        這樣在大量、頻繁的消息發(fā)送場景中可以提高發(fā)送效率減輕單個(gè) producer 的壓力。

        關(guān)閉 Producer

        最后則是 Producer 的關(guān)閉,Producer 在使用過程中消耗了不少資源(線程、內(nèi)存、網(wǎng)絡(luò)等)因此需要顯式的關(guān)閉從而回收這些資源。

        e05c17ff0719b5c87fcc0486c1e774d1.webp

        默認(rèn)的 close() 方法和帶有超時(shí)時(shí)間的方法都是在一定的時(shí)間后強(qiáng)制關(guān)閉。

        但在過期之前都會(huì)處理完剩余的任務(wù)。

        所以使用哪一個(gè)得視情況而定。

        總結(jié)

        本文內(nèi)容較多,從實(shí)例和源碼的角度分析了 Kafka 生產(chǎn)者。

        希望看完的朋友能有收獲,同時(shí)也歡迎留言討論。

        不出意外下期會(huì)討論 Kafka 消費(fèi)者。

        如果對(duì)你有幫助還請(qǐng)分享讓更多的人看到。

        如果大家想要實(shí)時(shí)關(guān)注我更新的文章以及分享的干貨的話,可以關(guān)注我的公眾號(hào)Java3y

        • ?獲取海量視頻資源5552357276f265ddba3293c953409b2b.webp

        • 獲取Java精美腦圖5552357276f265ddba3293c953409b2b.webp

        • ?獲取Java學(xué)習(xí)路線5552357276f265ddba3293c953409b2b.webp

        • 獲取開發(fā)常用工具5552357276f265ddba3293c953409b2b.webp

        • ?的PDF電子書5552357276f265ddba3293c953409b2b.webp


        在公眾號(hào)下回復(fù)「888」即可獲?。?!

        7a66795e689cc72e5aa2ad208a589f64.webp

        點(diǎn)個(gè)在看e792750660d73340e4e5c674727335ef.webp,分享到朋友圈f49b3f2a87364625eb8cc6827d6e526b.webp,對(duì)我真的很重要?。?/strong>

        瀏覽 25
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

          <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            天天综合色综合 | 午夜18 视频在线观看 | 中文字幕亚洲高清 | 艹美女网站| 寂寞少妇让水电工爽hd | 偷窥不良护士 | 明兰乱淫h侵犯h文 | 韩国黄色片网站 | 国产一卡二卡在线 | 看個免費操比的 |