Kafka 萬億級消息實戰(zhàn)
作者:vivo互聯(lián)網(wǎng)服務(wù)器團隊-Yang Yijun
一、Kafka應(yīng)用



//未指定遷移目錄的遷移計劃{"version":1,"partitions":[{"topic":"yyj4","partition":0,"replicas":[1000003,1000004]},{"topic":"yyj4","partition":1,"replicas":[1000003,1000004]},{"topic":"yyj4","partition":2,"replicas":[1000003,1000004]}]}
//指定遷移目錄的遷移計劃{"version":1,"partitions":[{"topic":"yyj1","partition":0,"replicas":[1000006,1000005],"log_dirs":["/data1/bigdata/mydata1","/data1/bigdata/mydata3"]},{"topic":"yyj1","partition":1,"replicas":[1000006,1000005],"log_dirs":["/data1/bigdata/mydata1","/data1/bigdata/mydata3"]},{"topic":"yyj1","partition":2,"replicas":[1000006,1000005],"log_dirs":["/data1/bigdata/mydata1","/data1/bigdata/mydata3"]}]}


/config/users/<user>/clients/<client-id> //根據(jù)用戶和客戶端ID組合限流/config/users/<user>/clients/<default>/config/users/<user>//根據(jù)用戶限流 這種限流方式是我們最常用的方式/config/users/<default>/clients/<client-id>/config/users/<default>/clients/<default>/config/users/<default>/config/clients/<client-id>/config/clients/<default>
(1)消費流量指標(biāo):ObjectName:kafka.server:type=Fetch,user=acl認證用戶名稱 屬性:byte-rate(用戶在當(dāng)前broker的出流量)、throttle-time(用戶在當(dāng)前broker的出流量被限制時間)(2)生產(chǎn)流量指標(biāo):ObjectName:kafka.server:type=Produce,user=acl認證用戶名稱 屬性:byte-rate(用戶在當(dāng)前broker的入流量)、throttle-time(用戶在當(dāng)前broker的入流量被限制時間)


//副本同步限流配置共涉及以下4個參數(shù)leader.replication.throttled.ratefollower.replication.throttled.rateleader.replication.throttled.replicasfollower.replication.throttled.replicas
(1)副本同步出流量指標(biāo):ObjectName:kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesOutPerSec(2)副本同步入流量指標(biāo):ObjectName:kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesInPerSec


Kafka Manager;
Kafka Eagle;
Kafka Monitor;
KafkaOffsetMonitor;









Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//ClientMetricsReporter類實現(xiàn)org.apache.kafka.common.metrics.MetricsReporter接口props.put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, ClientMetricsReporter.class.getName());...

維度:用戶名稱、客戶端ID、客戶端IP、topic名稱、集群名稱、brokerIP; 指標(biāo):連接數(shù)、IO等待時間、生產(chǎn)流量大小、生產(chǎn)記錄數(shù)、請求次數(shù)、請求延時、發(fā)送錯誤/重試次數(shù)等。
維度:用戶名稱、客戶端ID、客戶端IP、topic名稱、集群名稱、消費組、brokerIP、topic分區(qū); 指標(biāo):連接數(shù)、io等待時間、消費流量大小、消費記錄數(shù)、消費延時、topic分區(qū)消費延遲記錄等。
1) Zookeeper進程監(jiān)控;
2) Zookeeper的leader切換監(jiān)控;
3) Zookeeper服務(wù)的錯誤日志監(jiān)控;


必須保證topic分區(qū)leader與follower輪詢的分布在資源組內(nèi)所有broker上,讓流量分布更加均衡,同時需要考慮相同分區(qū)不同副本跨機架分布以提高容災(zāi)能力;
當(dāng)topic分區(qū)leader個數(shù)除以資源組節(jié)點個數(shù)有余數(shù)時,需要把余數(shù)分區(qū)leader優(yōu)先考慮放入流量較低的broker。
擴容智能評估:根據(jù)集群負載,把是否需要擴容評估程序化、智能化;
智能擴容:當(dāng)評估需要擴容后,把擴容流程以及流量均衡平臺化。
一些老化的服務(wù)器需要下線,實現(xiàn)節(jié)點下線平臺化;
服務(wù)器故障,broker故障無法恢復(fù),我們需要下線故障服務(wù)器,實現(xiàn)節(jié)點下線平臺化;
有更優(yōu)配置的服務(wù)器替換已有broker節(jié)點,實現(xiàn)下線節(jié)點平臺化。





1)生成副本遷移計劃以及執(zhí)行遷移任務(wù)平臺化、自動化、智能化; 2)執(zhí)行均衡后broker間流量比較均勻,且單個topic分區(qū)均勻分布在所有broker節(jié)點上;
3)執(zhí)行均衡后broker內(nèi)部多塊磁盤間流量比較均衡;
2. Introduction to Kafka Cruise Control
3. Cloudera Cruise Control REST API Reference

1)選擇核心指標(biāo)作為生成遷移計劃的依據(jù),比如出流量、入流量、機架、單topic分區(qū)分散性等;
2)優(yōu)化用來生成遷移計劃的指標(biāo)樣本,比如過濾流量突增/突降/掉零等異常樣本;
3)各資源組的遷移計劃需要使用的樣本全部為資源組內(nèi)部樣本,不涉及其他資源組,無交叉;
4)治理單分區(qū)過大topic,讓topic分區(qū)分布更分散,流量不集中在部分broker,讓topic單分區(qū)數(shù)據(jù)量更小,這樣可以減少遷移的數(shù)據(jù)量,提升遷移速度;
5)已經(jīng)均勻分散在資源組內(nèi)的topic,加入遷移黑名單,不做遷移,這樣可以減少遷移的數(shù)據(jù)量,提升遷移速度;
6)做topic治理,排除長期無流量topic對均衡的干擾;
7)新建topic或者topic分區(qū)擴容時,應(yīng)讓所有分區(qū)輪詢分布在所有broker節(jié)點,輪詢后余數(shù)分區(qū)優(yōu)先分布流量較低的broker;
8)擴容broker節(jié)點后開啟負載均衡時,優(yōu)先把同一broker分配了同一大流量(流量大而不是存儲空間大,這里可以認為是每秒的吞吐量)topic多個分區(qū)leader的,遷移一部分到新broker節(jié)點;
9)提交遷移任務(wù)時,同一批遷移計劃中的分區(qū)數(shù)據(jù)大小偏差應(yīng)該盡可能小,這樣可以避免遷移任務(wù)中小分區(qū)遷移完成后長時間等待大分區(qū)的遷移,造成任務(wù)傾斜;
(1)生產(chǎn)者權(quán)限認證;
(2)消費者權(quán)限認證;
(3)指定數(shù)據(jù)目錄遷移安全認證;
GitHub地址:https://github.com 精確KIP地址 :https://cwiki.apache.org
num.network.threads創(chuàng)建Processor處理網(wǎng)絡(luò)請求線程個數(shù),建議設(shè)置為broker當(dāng)CPU核心數(shù)*2,這個值太低經(jīng)常出現(xiàn)網(wǎng)絡(luò)空閑太低而缺失副本。num.io.threads創(chuàng)建KafkaRequestHandler處理具體請求線程個數(shù),建議設(shè)置為broker磁盤個數(shù)*2num.replica.fetchers建議設(shè)置為CPU核心數(shù)/4,適當(dāng)提高可以提升CPU利用率及follower同步leader數(shù)據(jù)當(dāng)并行度。compression.type建議采用lz4壓縮類型,壓縮可以提升CPU利用率同時可以減少網(wǎng)絡(luò)傳輸數(shù)據(jù)量。queued.max.requests如果是生產(chǎn)環(huán)境,建議配置最少500以上,默認為500。log.flush.scheduler.interval.mslog.flush.interval.mslog.flush.interval.messages這幾個參數(shù)表示日志數(shù)據(jù)刷新到磁盤的策略,應(yīng)該保持默認配置,刷盤策略讓操作系統(tǒng)去完成,由操作系統(tǒng)來決定什么時候把數(shù)據(jù)刷盤;如果設(shè)置來這個參數(shù),可能對吞吐量影響非常大;auto.leader.rebalance.enable表示是否開啟leader自動負載均衡,默認true;我們應(yīng)該把這個參數(shù)設(shè)置為false,因為自動負載均衡不可控,可能影響集群性能和穩(wěn)定;
linger.ms#客戶端生產(chǎn)消息等待多久時間才發(fā)送到服務(wù)端,單位:毫秒。和batch.size參數(shù)配合使用;適當(dāng)調(diào)大可以提升吞吐量,但是如果客戶端如果down機有丟失數(shù)據(jù)風(fēng)險;batch.size#客戶端發(fā)送到服務(wù)端消息批次大小,和linger.ms參數(shù)配合使用;適當(dāng)調(diào)大可以提升吞吐量,但是如果客戶端如果down機有丟失數(shù)據(jù)風(fēng)險;compression.type#建議采用lz4壓縮類型,具備較高的壓縮比及吞吐量;由于Kafka對CPU的要求并不高,所以,可以通過壓縮,充分利用CPU資源以提升網(wǎng)絡(luò)吞吐量;buffer.memory#客戶端緩沖區(qū)大小,如果topic比較大,且內(nèi)存比較充足,可以適當(dāng)調(diào)高這個參數(shù),默認只為33554432(32MB)retries#生產(chǎn)失敗后的重試次數(shù),默認0,可以適當(dāng)增加。當(dāng)重試超過一定次數(shù)后,如果業(yè)務(wù)要求數(shù)據(jù)準(zhǔn)確性較高,建議做容錯處理。retry.backoff.ms#生產(chǎn)失敗后,重試時間間隔,默認100ms,建議不要設(shè)置太大或者太小。
1)topic分區(qū)集中落在某幾個broker節(jié)點上,導(dǎo)致流量副本失衡;
2)導(dǎo)致broker節(jié)點內(nèi)部某幾塊磁盤讀寫超負載,存儲被寫爆;
當(dāng)topic數(shù)據(jù)量非常大時,建議一個分區(qū)開啟一個線程去消費; 對topic消費延時添加監(jiān)控告警,及時發(fā)現(xiàn)處理; 當(dāng)topic數(shù)據(jù)可以丟棄時,遇到超大延時,比如單個分區(qū)延遲記錄超過千萬甚至數(shù)億,那么可以重置topic的消費點位進行緊急處理;【此方案一般在極端場景才使用】 避免重置topic的分區(qū)offset到很早的位置,這可能造成拉取大量歷史數(shù)據(jù);
提升操作效率; 操作出錯概率更小,集群更安全; 所有操作有跡可循,可以追溯;
在平臺上為用戶的topic提供生產(chǎn)樣例數(shù)據(jù)與消費抽樣的功能,用戶可以不用自己寫代碼也可以測試topic是否可以使用,權(quán)限是否正常;
在平臺上為用戶的topic提供生產(chǎn)/消費權(quán)限驗證功能,讓用戶可以明確自己的賬號對某個topic有沒有讀寫權(quán)限;
1)無流量topic的治理,對集群中無流量topic進行清理,減少過多無用元數(shù)據(jù)對集群造成的壓力;
2)topic分區(qū)數(shù)據(jù)大小治理,把topic分區(qū)數(shù)據(jù)量過大的topic(如單分區(qū)數(shù)據(jù)量超過100GB/天)進行梳理,看看是否需要擴容,避免數(shù)據(jù)集中在集群部分節(jié)點上;
3)topic分區(qū)數(shù)據(jù)傾斜治理,避免客戶端在生產(chǎn)消息的時候,指定消息的key,但是key過于集中,消息只集中分布在部分分區(qū),導(dǎo)致數(shù)據(jù)傾斜;
4)topic分區(qū)分散性治理,讓topic分區(qū)分布在集群盡可能多的broker上,這樣可以避免因topic流量突增,流量只集中到少數(shù)節(jié)點上的風(fēng)險,也可以避免某個broker異常對topic影響非常大;
5)topic分區(qū)消費延時治理;一般有延時消費較多的時候有兩種情況,一種是集群性能下降,另外一種是業(yè)務(wù)方的消費并發(fā)度不夠,如果是消費者并發(fā)不夠的化應(yīng)該與業(yè)務(wù)聯(lián)系增加消費并發(fā)。
1)把所有指標(biāo)采集做成平臺可配置,提供統(tǒng)一的指標(biāo)采集和指標(biāo)展示及告警平臺,實現(xiàn)一體化監(jiān)控; 2)把上下游業(yè)務(wù)進行關(guān)聯(lián),做成全鏈路監(jiān)控; 3)用戶可以配置topic或者分區(qū)流量延時、突變等監(jiān)控告警;
1)為我們進行資源申請評估提供依據(jù); 2)讓我們更了解集群的讀寫能力及瓶頸在哪里,針對瓶頸進行優(yōu)化; 3)為我們限流閾值設(shè)置提供依據(jù); 4)為我們評估什么時候應(yīng)該擴容提供依據(jù);
1)為我們創(chuàng)建topic時,評估應(yīng)該指定多少分區(qū)合理提供依據(jù); 2)為我們topic的分區(qū)擴容評估提供依據(jù);
1)為我們了解磁盤的真正讀寫能力,為我們選擇更合適Kafka的磁盤類型提供依據(jù); 2)為我們做磁盤流量告警閾值設(shè)置提供依據(jù);
1)我們需要了解單個集群規(guī)模的上限或者是元數(shù)據(jù)規(guī)模的上限,探索相關(guān)信息對集群性能和穩(wěn)定性的影響; 2)根據(jù)摸底情況,評估集群節(jié)點規(guī)模的合理范圍,及時預(yù)測風(fēng)險,進行超大集群的拆分等工作;
二、開源版本功能缺陷
無法實現(xiàn)增量遷移;【我們已經(jīng)基于2.1.1版本源碼改造,實現(xiàn)了增量遷移】
無法實現(xiàn)并發(fā)遷移;【開源版本直到2.6.0才實現(xiàn)了并發(fā)遷移】
無法實現(xiàn)終止遷移;【我們已經(jīng)基于2.1.1版本源碼改造,實現(xiàn)了終止副本遷移】【開源版本直到2.6.0才實現(xiàn)了暫停遷移,和終止遷移有些不一樣,不會回滾元數(shù)據(jù)】
當(dāng)指定遷移數(shù)據(jù)目錄時,遷移過程中,如果把topic保留時間改短,topic保留時間針對正在遷移topic分區(qū)不生效,topic分區(qū)過期數(shù)據(jù)無法刪除;【開源版本bug,目前還沒有修復(fù)】
當(dāng)指定遷移數(shù)據(jù)目錄時,當(dāng)遷移計劃為以下場景時,整個遷移任務(wù)無法完成遷移,一直處于卡死狀態(tài);【開源版本bug,目前還沒有修復(fù)】
遷移過程中,如果有重啟broker節(jié)點,那個broker節(jié)點上的所有l(wèi)eader分區(qū)無法切換回來,導(dǎo)致節(jié)點流量全部轉(zhuǎn)移到其他節(jié)點,直到所有副本被遷移完畢后leader才會切換回來;【開源版本bug,目前還沒有修復(fù)】。
在原生的Kafka版本中存在以下指定數(shù)據(jù)目錄場景無法遷移完畢的情況,此版本我們也不決定修復(fù)次bug:1.針對同一個topic分區(qū),如果部分目標(biāo)副本相比原副本是所屬broker發(fā)生變化,部分目標(biāo)副本相比原副本是broker內(nèi)部所屬數(shù)據(jù)目錄發(fā)生變化;那么副本所屬broker發(fā)生變化的那個目標(biāo)副本可以正常遷移完畢,目標(biāo)副本是在broker內(nèi)部數(shù)據(jù)目錄發(fā)生變化的無法正常完成遷移;但是舊副本依然可以正常提供生產(chǎn)、消費服務(wù),并且不影響下一次遷移任務(wù)的提交,下一次遷移任務(wù)只需要把此topic分區(qū)的副本列表所屬broker列表變更后提交依然可以正常完成遷移,并且可以清理掉之前未完成的目標(biāo)副本;這里假設(shè)topic yyj1的初始化副本分布情況如下:{"version":1,"partitions":[{"topic":"yyj","partition":0,"replicas":[1000003,1000001],"log_dirs":["/kfk211data/data31","/kfk211data/data13"]}]}//遷移場景1:{"version":1,"partitions":[{"topic":"yyj","partition":0,"replicas":[1000003,1000002],"log_dirs":["/kfk211data/data32","/kfk211data/data23"]}]}//遷移場景2:{"version":1,"partitions":[{"topic":"yyj","partition":0,"replicas":[1000002,1000001],"log_dirs":["/kfk211data/data22","/kfk211data/data13"]}]}針對上述的topic yyj1的分布分布情況,此時如果我們的遷移計劃為“遷移場景1”或遷移場景2“,那么都將出現(xiàn)有副本無法遷移完畢的情況。但是這并不影響舊副本處理生產(chǎn)、消費請求,并且我們可以正常提交其他的遷移任務(wù)。為了清理舊的未遷移完成的副本,我們只需要修改一次遷移計劃【新的目標(biāo)副本列表和當(dāng)前分區(qū)已分配副本列表完全不同即可】,再次提交遷移即可。這里,我們依然以上述的例子做遷移計劃修改如下:{"version":1,"partitions":[{"topic":"yyj","partition":0,"replicas":[1000004,1000005],"log_dirs":["/kfk211data/data42","/kfk211data/data53"]}]}這樣我們就可以正常完成遷移。
/config/users/<user>/clients/<client-id>/config/users/<user>/clients/<default>/config/users/<user>/config/users/<default>/clients/<client-id>/config/users/<default>/clients/<default>/config/users/<default>/config/clients/<client-id>/config/clients/<default>
(1)改造源碼,實現(xiàn)單個broker流量上限限制,只要流量達到broker上限立即進行限流處理,所有往這個broker寫的用戶都可以被限制??;或者對用戶進行優(yōu)先級處理,放過高優(yōu)先級的,限制低優(yōu)先級的;
(2)改造源碼,實現(xiàn)broker上單塊磁盤流量上限限制(很多時候都是流量集中到某幾塊磁盤上,導(dǎo)致沒有達到broker流量上限卻超過了單磁盤讀寫能力上限),只要磁盤流量達到上限,立即進行限流處理,所有往這個磁盤寫的用戶都可以被限制?。换蛘邔τ脩暨M行優(yōu)先級處理,放過高優(yōu)先級的,限制低優(yōu)先級的;
(3)改造源碼,實現(xiàn)topic維度限流以及對topic分區(qū)的禁寫功能;
(4)改造源碼,實現(xiàn)用戶、broker、磁盤、topic等維度組合精準(zhǔn)限流;
三、kafka發(fā)展趨勢
3.8 Kafka所有KIP地址
四、如何貢獻社區(qū)
2)https://issues.apache.org/jira/secure/BrowseProjects.jspa?selectedCategory=all
評論
圖片
表情
