1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        Apache Kafka 快速入門指南

        共 9450字,需瀏覽 19分鐘

         ·

        2020-09-24 15:36

        ?

        「寫在前面」:我是「云祁」,一枚熱愛技術(shù)、會(huì)寫詩的大數(shù)據(jù)開發(fā)猿。昵稱來源于王安石詩中一句 [ 云之祁祁,或雨于淵 ],甚是喜歡。寫博客一方面是對自己學(xué)習(xí)的一點(diǎn)點(diǎn)總結(jié)及記錄,另一方面則是希望能夠幫助更多對大數(shù)據(jù)感興趣的朋友。如果你也對 數(shù)據(jù)中臺(tái)以及 Hadoop / Flink / Spark 等大數(shù)據(jù)技術(shù)感興趣,可以關(guān)注我的博客, 每天都要進(jìn)步一點(diǎn)點(diǎn),生命不是要超越別人,而是要超越自己!(? ?_?)?

        ?

        一、Kafka 是什么?

        有人說世界上有三個(gè)偉大的發(fā)明:火,輪子,以及 Kafka。

        發(fā)展到現(xiàn)在,Apache Kafka 無疑是很成功的,Confluent 公司曾表示世界五百強(qiáng)中有三分之一的企業(yè)在使用 Kafka。在流式計(jì)算中,Kafka 一般用來緩存數(shù)據(jù),例如 Flink 通過消費(fèi) Kafka 的數(shù)據(jù)進(jìn)行計(jì)算。

        關(guān)于Kafka,我們最先需要了解的是以下四點(diǎn):


        1. Apache Kafka 是一個(gè)開源 「消息」 系統(tǒng),由 Scala 寫成。是由 Apache 軟件基金會(huì)開發(fā)的 一個(gè)開源消息系統(tǒng)項(xiàng)目。

        2. Kafka 最初是由 LinkedIn 公司開發(fā),用作 LinkedIn 的活動(dòng)流(Activity Stream)和運(yùn)營數(shù)據(jù)處理管道(Pipeline)的基礎(chǔ),現(xiàn)在它已被多家不同類型的公司作為多種類型的數(shù)據(jù)管道和消息系統(tǒng)使用。

        3. 「Kafka 是一個(gè)分布式消息隊(duì)列」。Kafka 對消息保存時(shí)根據(jù) Topic 進(jìn)行歸類,發(fā)送消息 者稱為 Producer,消息接受者稱為 Consumer,此外 kafka 集群有多個(gè) kafka 實(shí)例組成,每個(gè) 實(shí)例(server)稱為 broker。

        4. 無論是 kafka 集群,還是 consumer 都依賴于 「Zookeeper」 集群保存一些 meta 信息, 來保證系統(tǒng)可用性。

        二、為什么要有 Kafka?

        「kafka」 之所以受到越來越多的青睞,與它所扮演的三大角色是分不開的的:

        • 「消息系統(tǒng)」:kafka與傳統(tǒng)的消息中間件都具備系統(tǒng)解耦、冗余存儲(chǔ)、流量削峰、緩沖、異步通信、擴(kuò)展性、可恢復(fù)性等功能。與此同時(shí),kafka還提供了大多數(shù)消息系統(tǒng)難以實(shí)現(xiàn)的消息順序性保障及回溯性消費(fèi)的功能。
        • 「存儲(chǔ)系統(tǒng)」:kafka把消息持久化到磁盤,相比于其他基于內(nèi)存存儲(chǔ)的系統(tǒng)而言,有效的降低了消息丟失的風(fēng)險(xiǎn)。這得益于其消息持久化和多副本機(jī)制。也可以將kafka作為長期的存儲(chǔ)系統(tǒng)來使用,只需要把對應(yīng)的數(shù)據(jù)保留策略設(shè)置為“永久”或啟用主題日志壓縮功能。
        • 「流式處理平臺(tái)」:kafka為流行的流式處理框架提供了可靠的數(shù)據(jù)來源,還提供了一個(gè)完整的流式處理框架,比如窗口、連接、變換和聚合等各類操作。
        Kafka特性
        分布式具備經(jīng)濟(jì)、快速、可靠、易擴(kuò)充、數(shù)據(jù)共享、設(shè)備共享、通訊方便、靈活等,分布式所具備的特性
        高吞吐量同時(shí)為數(shù)據(jù)生產(chǎn)者和消費(fèi)者提高吞吐量
        高可靠性支持多個(gè)消費(fèi)者,當(dāng)某個(gè)消費(fèi)者失敗的時(shí)候,能夠自動(dòng)負(fù)載均衡
        離線能將消息持久化,進(jìn)行批量處理
        解耦作為各個(gè)系統(tǒng)連接的橋梁,避免系統(tǒng)之間的耦合

        三、Kafka 基本概念

        在深入理解 Kafka 之前,可以先了解下 Kafka 的基本概念。

        一個(gè)典型的 Kafka 包含若干Producer、若干 Broker、若干 Consumer 以及一個(gè) Zookeeper 集群。Zookeeper 是 Kafka 用來負(fù)責(zé)集群元數(shù)據(jù)管理、控制器選舉等操作的。Producer 是負(fù)責(zé)將消息發(fā)送到 Broker 的,Broker 負(fù)責(zé)將消息持久化到磁盤,而 Consumer 是負(fù)責(zé)從Broker 訂閱并消費(fèi)消息。Kafka體系結(jié)構(gòu)如下所示:

        概念一:生產(chǎn)者(Producer)與消費(fèi)者(Consumer)

        生產(chǎn)者和消費(fèi)者

        對于 Kafka 來說客戶端有兩種基本類型:「生產(chǎn)者」(Producer)和 「消費(fèi)者」(Consumer)。除此之外,還有用來做數(shù)據(jù)集成的 Kafka Connect API 和流式處理的 「Kafka Streams」 等高階客戶端,但這些高階客戶端底層仍然是生產(chǎn)者和消費(fèi)者API,只不過是在上層做了封裝。

        • 「Producer」 :消息生產(chǎn)者,就是向 Kafka broker 發(fā)消息的客戶端;
        • 「Consumer」 :消息消費(fèi)者,向 Kafka broker 取消息的客戶端;

        概念二:Broker 和集群(Cluster)

        一個(gè) Kafka 服務(wù)器也稱為 「Broker」,它接受生產(chǎn)者發(fā)送的消息并存入磁盤;Broker 同時(shí)服務(wù)消費(fèi)者拉取分區(qū)消息的請求,返回目前已經(jīng)提交的消息。使用特定的機(jī)器硬件,一個(gè) Broker 每秒可以處理成千上萬的分區(qū)和百萬量級的消息。

        若干個(gè) Broker 組成一個(gè) 「集群」「Cluster」),其中集群內(nèi)某個(gè) Broker 會(huì)成為集群控制器(Cluster Controller),它負(fù)責(zé)管理集群,包括分配分區(qū)到 Broker、監(jiān)控 Broker 故障等。在集群內(nèi),一個(gè)分區(qū)由一個(gè) Broker 負(fù)責(zé),這個(gè) Broker 也稱為這個(gè)分區(qū)的 Leader;當(dāng)然一個(gè)分區(qū)可以被復(fù)制到多個(gè) Broker 上來實(shí)現(xiàn)冗余,這樣當(dāng)存在 Broker 故障時(shí)可以將其分區(qū)重新分配到其他 Broker 來負(fù)責(zé)。下圖是一個(gè)樣例:

        Broker 和集群(Cluster)

        概念三:主題(Topic)與分區(qū)(Partition)

        主題(Topic)與分區(qū)(Partition)

        在 Kafka 中,消息以 「主題」「Topic」)來分類,每一個(gè)主題都對應(yīng)一個(gè)「「消息隊(duì)列」」,這有點(diǎn)兒類似于數(shù)據(jù)庫中的表。但是如果我們把所有同類的消息都塞入到一個(gè)“中心”隊(duì)列中,勢必缺少可伸縮性,無論是生產(chǎn)者/消費(fèi)者數(shù)目的增加,還是消息數(shù)量的增加,都可能耗盡系統(tǒng)的性能或存儲(chǔ)。

        我們使用一個(gè)生活中的例子來說明:現(xiàn)在 A 城市生產(chǎn)的某商品需要運(yùn)輸?shù)?B 城市,走的是公路,那么單通道的高速公路不論是在「A 城市商品增多」還是「現(xiàn)在 C 城市也要往 B 城市運(yùn)輸東西」這樣的情況下都會(huì)出現(xiàn)「吞吐量不足」的問題。所以我們現(xiàn)在引入 「分區(qū)」「Partition」)的概念,類似“允許多修幾條道”的方式對我們的主題完成了水平擴(kuò)展。

        四、Kafka 工作流程分析

        4.1 Kafka 生產(chǎn)過程分析

        4.1.1 寫入方式

        producer 采用推(push)模式將消息發(fā)布到 broker,每條消息都被追加(append)到分區(qū)(patition)中,屬于順序?qū)懘疟P(順序?qū)懘疟P效率比隨機(jī)寫內(nèi)存要高,保障 kafka 吞吐率)

        4.1.2 分區(qū)(Partition)

        消息發(fā)送時(shí)都被發(fā)送到一個(gè) topic,其本質(zhì)就是一個(gè)目錄,而 topic 是由一些 Partition Logs(分區(qū)日志)組成,其組織結(jié)構(gòu)如下圖所示:

        我們可以看到,每個(gè) Partition 中的消息都是 「有序」 的,生產(chǎn)的消息被不斷追加到 Partition log 上,其中的每一個(gè)消息都被賦予了一個(gè)唯一的 「offset」 值。

        「1)分區(qū)的原因」

        1. 方便在集群中擴(kuò)展,每個(gè) Partition 可以通過調(diào)整以適應(yīng)它所在的機(jī)器,而一個(gè) topic 又可以有多個(gè) Partition 組成,因此整個(gè)集群就可以適應(yīng)任意大小的數(shù)據(jù)了;
        2. 可以提高并發(fā),因?yàn)榭梢砸?Partition 為單位讀寫了。

        「2)分區(qū)的原則」

        1. 指定了 patition,則直接使用;
        2. 未指定 patition 但指定 key,通過對 key 的 value 進(jìn)行 hash 出一個(gè) patition;
        3. patition 和 key 都未指定,使用輪詢選出一個(gè) patition。
        DefaultPartitioner?類?
        public?int?partition(String?topic,?Object?key,?byte[]?keyBytes,?Object?value,?byte[]?valueBytes,?Cluster?cluster)?{?
        ?List?partitions?=?cluster.partitionsForTopic(topic);?
        ?int?numPartitions?=?partitions.size();?
        ?if?(keyBytes?==?null)?{
        ???int?nextValue?=?nextValue(topic);?
        ???List?availablePartitions?=?cluster.availablePartitionsForTopic(topic);
        ???if?(availablePartitions.size()?>?0)?{?
        ???int?part?=?Utils.toPositive(nextValue)?%?availablePartitions.size();?
        ???return?availablePartitions.get(part).partition();
        ????}?else?{?
        ????//?no?partitions?are?available,?give?a?non-available?partition?
        ????return?Utils.toPositive(nextValue)?%?numPartitions;?
        ????}?
        ????}?else?{?
        ????//?hash?the?keyBytes?to?choose?a?partition?
        ????return?Utils.toPositive(Utils.murmur2(keyBytes))?%?numPartitions;?
        ????}
        ?}

        4.1.3 副本(Replication)

        同 一 個(gè) partition 可 能 會(huì) 有 多 個(gè) replication ( 對 應(yīng) server.properties 配 置 中 的 default.replication.factor=N)。沒有 replication 的情況下,一旦 broker 宕機(jī),其上所有 patition 的數(shù)據(jù)都不可被消費(fèi),同時(shí) producer 也不能再將數(shù)據(jù)存于其上的 patition。引入 replication 之后,同一個(gè) partition 可能會(huì)有多個(gè) replication,而這時(shí)需要在這些 replication 之間選出一 個(gè) leader,producer 和 consumer 只與這個(gè) leader 交互,其它 replication 作為 follower 從 leader 中復(fù)制數(shù)據(jù)。

        4.1.4 寫入流程

        producer 寫入消息流程如下:

        1)producer 先從 zookeeper 的 "/brokers/.../state"節(jié)點(diǎn)找到該 partition 的 leader ;

        2)producer 將消息發(fā)送給該 leader ;

        3)leader 將消息寫入本地 log ;

        4)followers 從 leader pull 消息,寫入本地 log 后向 leader 發(fā)送 ACK ;

        5)leader 收到所有 ISR 中的 replication 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset)并向 producer 發(fā)送 ACK ;

        4.2 Broker 保存消息

        4.2.1 存儲(chǔ)方式

        物理上把 topic 分成一個(gè)或多個(gè) patition(對應(yīng) server.properties 中的 num.partitions=3 配 置),每個(gè) patition 物理上對應(yīng)一個(gè)文件夾(該文件夾存儲(chǔ)該 patition 的所有消息和索引文 件),如下:

        [root@hadoop102?logs]$?ll?
        drwxrwxr-x.?2?demo?demo?4096?8?月?6?14:37?first-0?
        drwxrwxr-x.?2?demo?demo?4096?8?月?6?14:35?first-1?
        drwxrwxr-x.?2?demo?demo?4096?8?月?6?14:37?first-2?

        [root@hadoop102?logs]$?cd?first-0?
        [root@hadoop102?first-0]$?ll?
        -rw-rw-r--.?1?demo?demo?10485760?8?月?6?14:33?00000000000000000000.index?
        -rw-rw-r--.?1?demo?demo?219?8?月?6?15:07?00000000000000000000.log?
        -rw-rw-r--.?1?demo?demo?10485756?8?月?6?14:33?00000000000000000000.timeindex?
        -rw-rw-r--.?1?demo?demo?8?8?月?6?14:37?leader-epoch-checkpoint

        4.2.2 ?存儲(chǔ)策略

        無論消息是否被消費(fèi),kafka 都會(huì)保留所有消息。有兩種策略可以刪除舊數(shù)據(jù):

        • 基于時(shí)間:log.retention.hours=168
        • 基于大小:log.retention.bytes=1073741824

        需要注意的是,因?yàn)?Kafka 讀取特定消息的時(shí)間復(fù)雜度為 O(1),即與文件大小無關(guān), 所以這里刪除過期文件與提高 Kafka 性能無關(guān)。

        4.2.3 Zookeeper 存儲(chǔ)結(jié)構(gòu)

        注意:producer 不在 zk 中注冊,消費(fèi)者在 zk 中注冊。

        4.3 Kafka 消費(fèi)過程分析

        kafka 提供了兩套 consumer API:高級 Consumer API 和低級 Consumer API。

        4.3.1 高級 API

        「1)高級 API 優(yōu)點(diǎn)」

        • 高級 API 寫起來簡單

        • 不需要自行去管理 offset,系統(tǒng)通過 zookeeper 自行管理。

        • 不需要管理分區(qū),副本等情況,系統(tǒng)自動(dòng)管理。

        • 消費(fèi)者斷線會(huì)自動(dòng)根據(jù)上一次記錄在 zookeeper 中的 offset 去接著獲取數(shù)據(jù)(默認(rèn)設(shè)置 1 分鐘更新一下 zookeeper 中存的 offset)

        • 可以使用 group 來區(qū)分對同一個(gè) topic 的不同程序訪問分離開來(不同的 group 記錄不同的 offset,這樣不同程序讀取同一個(gè) topic 才不會(huì)因?yàn)?offset 互相影響)

        「2)高級 API 缺點(diǎn)」

        • 不能自行控制 offset(對于某些特殊需求來說)
        • 不能細(xì)化控制如分區(qū)、副本、zk 等

        4.3.2 低級 API

        「1)低級 API 優(yōu)點(diǎn)」

        • 能夠讓開發(fā)者自己控制 offset,想從哪里讀取就從哪里讀取。
        • 自行控制連接分區(qū),對分區(qū)自定義進(jìn)行負(fù)載均衡
        • 對 zookeeper 的依賴性降低(如:offset 不一定非要靠 zk 存儲(chǔ),自行存儲(chǔ) offset 即可, 比如存在文件或者內(nèi)存中)

        「2)低級 API 缺點(diǎn)」

        • 太過復(fù)雜,需要自行控制 offset,連接哪個(gè)分區(qū),找到分區(qū) leader 等。

        4.3.3 消費(fèi)者組

        消費(fèi)者是以 consumer group 消費(fèi)者組的方式工作,由一個(gè)或者多個(gè)消費(fèi)者組成一個(gè)組, 共同消費(fèi)一個(gè) topic。每個(gè)分區(qū)在同一時(shí)間只能由 group 中的一個(gè)消費(fèi)者讀取,但是多個(gè) group 可以同時(shí)消費(fèi)這個(gè) partition。在圖中,有一個(gè)由三個(gè)消費(fèi)者組成的 group,有一個(gè)消費(fèi)者讀取主題中的兩個(gè)分區(qū),另外兩個(gè)分別讀取一個(gè)分區(qū)。某個(gè)消費(fèi)者讀取某個(gè)分區(qū),也可以叫做某個(gè)消費(fèi)者是某個(gè)分區(qū)的擁有者。

        在這種情況下,消費(fèi)者可以通過水平擴(kuò)展的方式同時(shí)讀取大量的消息。另外,如果一個(gè)消費(fèi)者失敗了,那么其他的 group 成員會(huì)自動(dòng)負(fù)載均衡讀取之前失敗的消費(fèi)者讀取的分區(qū)。

        4.3.4 消費(fèi)方式

        consumer 采用 pull(拉)模式從 broker 中讀取數(shù)據(jù)。

        push(推)模式很難適應(yīng)消費(fèi)速率不同的消費(fèi)者,因?yàn)橄l(fā)送速率是由 broker 決定的。它的目標(biāo)是盡可能以最快速度傳遞消息,但是這樣很容易造成 consumer 來不及處理消息,典型的表現(xiàn)就是拒絕服務(wù)以及網(wǎng)絡(luò)擁塞。而 pull 模式則可以根據(jù) consumer 的消費(fèi)能力以適當(dāng)?shù)乃俾氏M(fèi)消息。

        對于 Kafka 而言,pull 模式更合適,它可簡化 broker 的設(shè)計(jì),consumer 可自主控制消費(fèi) 消息的速率,同時(shí) consumer 可以自己控制消費(fèi)方式——即可批量消費(fèi)也可逐條消費(fèi),同時(shí)還能選擇不同的提交方式從而實(shí)現(xiàn)不同的傳輸語義。

        pull 模式不足之處是,如果 kafka 沒有數(shù)據(jù),消費(fèi)者可能會(huì)陷入循環(huán)中,一直等待數(shù)據(jù) 到達(dá)。為了避免這種情況,我們在我們的拉請求中有參數(shù),允許消費(fèi)者請求在等待數(shù)據(jù)到達(dá) 的“長輪詢”中進(jìn)行阻塞(并且可選地等待到給定的字節(jié)數(shù),以確保大的傳輸大?。?。

        五、Kafka 安裝

        5.1 安裝環(huán)境與前提條件

        安裝環(huán)境:Linux

        前提條件:

        Linux系統(tǒng)下安裝好jdk 1.8以上版本,正確配置環(huán)境變量 Linux系統(tǒng)下安裝好scala 2.11版本

        安裝ZooKeeper(注:kafka自帶一個(gè)Zookeeper服務(wù),如果不單獨(dú)安裝,也可以使用自帶的ZK)

        5.2 安裝步驟

        Apache基金會(huì)開源的這些軟件基本上安裝都比較方便,只需要下載、解壓、配置環(huán)境變量三步即可完成,kafka也一樣,官網(wǎng)選擇對應(yīng)版本下載后直接解壓到一個(gè)安裝目錄下就可以使用了,如果為了方便可以在~/.bashrc里配置一下環(huán)境變量,這樣使用的時(shí)候就不需要每次都切換到安裝目錄了。

        具體可參考:Kafka 集群安裝與環(huán)境測試

        5.3 測試

        接下來可以通過簡單的console窗口來測試kafka是否安裝正確。

        「(1)首先啟動(dòng)ZooKeeper服務(wù)」

        如果啟動(dòng)自己安裝的ZooKeeper,使用命令zkServer.sh start即可。

        如果使用kafka自帶的ZK服務(wù),啟動(dòng)命令如下(啟動(dòng)之后shell不會(huì)返回,后續(xù)其他命令需要另開一個(gè)Terminal):

        $?cd?/opt/tools/kafka?#進(jìn)入安裝目錄
        $?bin/zookeeper-server-start.sh?config/zookeeper.properties

        「(2)第二步啟動(dòng)kafka服務(wù)」

        啟動(dòng)Kafka服務(wù)的命令如下所示:

        $?cd?/opt/tools/kafka?#進(jìn)入安裝目錄
        $?bin/kafka-server-start.sh?config/server.properties

        「(3)第三步創(chuàng)建一個(gè)topic,假設(shè)為“test”」

        創(chuàng)建topic的命令如下所示,其參數(shù)也都比較好理解,依次指定了依賴的ZooKeeper,副本數(shù)量,分區(qū)數(shù)量,topic的名字:

        $?cd?/opt/tools/kafka?#進(jìn)入安裝目錄
        $?bin/kafka-topics.sh?--create?--zookeeper?localhost:2181?--replication-factor?1?--partitions?1?--topic?test1

        創(chuàng)建完成后,可以通過如下所示的命令查看topic列表:

        $?bin/kafka-topics.sh?--list?--zookeeper?localhost:2181?

        「(4)開啟Producer和Consumer服務(wù)」

        kafka提供了生產(chǎn)者和消費(fèi)者對應(yīng)的console窗口程序,可以先通過這兩個(gè)console程序來進(jìn)行驗(yàn)證。

        首先啟動(dòng)Producer:

        $?cd?/opt/tools/kafka?#進(jìn)入安裝目錄
        $?bin/kafka-console-producer.sh?--broker-list?localhost:9092?--topic?test

        然后啟動(dòng)Consumer:

        $?cd?/opt/tools/kafka?#進(jìn)入安裝目錄
        $?bin/kafka-console-consumer.sh?--bootstrap-server?localhost:9092?--topic?test?--from-beginning

        在打開生產(chǎn)者服務(wù)的終端輸入一些數(shù)據(jù),回車后,在打開消費(fèi)者服務(wù)的終端能看到生產(chǎn)者終端輸入的數(shù)據(jù),即說明kafka安裝成功。

        六、Apache Kafka 簡單示例

        6.1 創(chuàng)建消息隊(duì)列

        kafka-topics.sh?--create?--zookeeper?192.168.56.137:2181?--topic?test?--replication-factor?1?--partitions?1

        6.2 pom.xml



        ????org.apache.kafka
        ????kafka-clients
        ????2.1.1

        6.3 生產(chǎn)者

        package?com.njbdqn.services;

        import?org.apache.kafka.clients.producer.KafkaProducer;
        import?org.apache.kafka.clients.producer.ProducerConfig;
        import?org.apache.kafka.clients.producer.ProducerRecord;
        import?org.apache.kafka.common.serialization.StringSerializer;

        import?java.util.Properties;

        /**
        ?*?@author:Tokgo J
        ?*?@date:2020/9/11
        ?*?@aim:生產(chǎn)者:往test消息隊(duì)列寫入消息
        ?*/


        public?class?MyProducer?{
        ????public?static?void?main(String[]?args)?{
        ????????//?定義配置信息
        ????????Properties?prop?=?new?Properties();
        ????????//?kafka地址,多個(gè)地址用逗號分割??"192.168.23.76:9092,192.168.23.77:9092"
        ????????prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.56.137:9092");
        ????????prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,?StringSerializer.class);
        ????????prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
        ????????KafkaProducer?prod?=?new?KafkaProducer(prop);

        ????????//?發(fā)送消息
        ????????try?{
        ????????????for(int?i=0;i<10;i++)?{
        ????????????????//?生產(chǎn)者記錄消息
        ????????????????ProducerRecord?pr?=?new?ProducerRecord("test",?"hello?world"+i);
        ????????????????prod.send(pr);
        ????????????????Thread.sleep(500);
        ????????????}
        ????????}?catch?(InterruptedException?e)?{
        ????????????e.printStackTrace();
        ????????}?finally?{
        ????????????prod.close();
        ????????}
        ????}
        }

        注意:

        1. kafka如果是集群,多個(gè)地址用逗號分割(,) ;
        2. Properties的put方法,第一個(gè)參數(shù)可以是字符串,如:p.put("bootstrap.servers","192.168.23.76:9092")
        3. kafkaProducer.send(record)可以通過返回的Future來判斷是否已經(jīng)發(fā)送到kafka,增強(qiáng)消息的可靠性。同時(shí)也可以使用send的第二個(gè)參數(shù)來回調(diào),通過回調(diào)判斷是否發(fā)送成功;
        4. p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 設(shè)置序列化類,可以寫類的全路徑。

        6.4 消費(fèi)者

        package?com.njbdqn.services;

        import?org.apache.kafka.clients.consumer.ConsumerConfig;
        import?org.apache.kafka.clients.consumer.ConsumerRecord;
        import?org.apache.kafka.clients.consumer.ConsumerRecords;
        import?org.apache.kafka.clients.consumer.KafkaConsumer;
        import?org.apache.kafka.common.serialization.StringDeserializer;
        import?org.apache.kafka.common.serialization.StringSerializer;

        import?java.time.Duration;
        import?java.util.Arrays;
        import?java.util.Collections;
        import?java.util.Properties;

        /**
        ?*?@author:Tokgo J
        ?*?@date:2020/9/11
        ?*?@aim:消費(fèi)者:讀取kafka數(shù)據(jù)
        ?*/


        public?class?MyConsumer?{
        ????public?static?void?main(String[]?args)?{
        ????????Properties?prop?=?new?Properties();
        ????????prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,?"192.168.56.137:9092");
        ????????prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,?StringDeserializer.class);
        ????????prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,?StringDeserializer.class);

        ????????prop.put("session.timeout.ms",?"30000");
        ????????//消費(fèi)者是否自動(dòng)提交偏移量,默認(rèn)是true?避免出現(xiàn)重復(fù)數(shù)據(jù)?設(shè)為false
        ????????prop.put("enable.auto.commit",?"false");
        ????????prop.put("auto.commit.interval.ms",?"1000");
        ????????//auto.offset.reset?消費(fèi)者在讀取一個(gè)沒有偏移量的分區(qū)或者偏移量無效的情況下的處理
        ????????//earliest?在偏移量無效的情況下?消費(fèi)者將從起始位置讀取分區(qū)的記錄
        ????????//latest?在偏移量無效的情況下?消費(fèi)者將從最新位置讀取分區(qū)的記錄
        ????????prop.put("auto.offset.reset",?"earliest");

        ????????//?設(shè)置組名
        ????????prop.put(ConsumerConfig.GROUP_ID_CONFIG,?"group");

        ????????KafkaConsumer?con?=?new?KafkaConsumer(prop);

        ????????con.subscribe(Collections.singletonList("test"));

        ????????while?(true)?{
        ????????????ConsumerRecords?records?=?con.poll(Duration.ofSeconds(100));
        ????????????for?(ConsumerRecord?rec?:?records)?{
        ????????????????System.out.println(String.format("offset:%d,key:%s,value:%s",?rec.offset(),?rec.key(),?rec.value()));

        ????????????}
        ????????}
        ????}
        }

        注意:

        1. 訂閱消息可以訂閱多個(gè)主題;
        2. ConsumerConfig.GROUP_ID_CONFIG 表示消費(fèi)者的分組,kafka根據(jù)分組名稱判斷是不是同一組消費(fèi)者,同一組消費(fèi)者去消費(fèi)一個(gè)主題的數(shù)據(jù)的時(shí)候,數(shù)據(jù)將在這一組消費(fèi)者上面輪詢;
        3. 主題涉及到分區(qū)的概念,同一組消費(fèi)者的個(gè)數(shù)不能大于分區(qū)數(shù)。因?yàn)椋阂粋€(gè)分區(qū)只能被同一群組的一個(gè)消費(fèi)者消費(fèi)。出現(xiàn)分區(qū)小于消費(fèi)者個(gè)數(shù)的時(shí)候,可以動(dòng)態(tài)增加分區(qū);
        4. 注意和生產(chǎn)者的對比,Properties中的key和value是反序列化,而生產(chǎn)者是序列化。

        七、參考

        朱小廝:《深入理解Kafka:核心設(shè)計(jì)與實(shí)踐原理》

        宇宙灣:《Apache Kafka 分布式消息隊(duì)列框架》

        —?【 THE END 】—
        本公眾號全部博文已整理成一個(gè)目錄,請?jiān)诠娞柪锘貜?fù)「m」獲??!


        3T技術(shù)資源大放送!包括但不限于:Java、C/C++,Linux,Python,大數(shù)據(jù),人工智能等等。在公眾號內(nèi)回復(fù)「1024」,即可免費(fèi)獲?。?!




        瀏覽 61
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評論
        圖片
        表情
        推薦
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            日韩免费无码视频 | 高清无码在线观看黄片 | 一男一女操逼视频 | 老司机视频在线视频18 | 堆萌操逼网站 | 国产精品久久久久久久久久清纯 | 涩爱av色老久久精品偷偷鲁 | 伊人A∨视频 | 做爰 视频毛片下载蜜桃视频 | 日本啪啪一级视频 |