1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        springboot 之集成kafka

        共 10058字,需瀏覽 21分鐘

         ·

        2021-03-08 19:20

        點(diǎn)擊上方藍(lán)色字體,選擇“標(biāo)星公眾號(hào)”

        優(yōu)質(zhì)文章,第一時(shí)間送達(dá)

        76套java從入門到精通實(shí)戰(zhàn)課程分享

        本章只介紹springboot微服務(wù)集成kafka,跟rabbitmq用法相同,作為一個(gè)消息中間件收發(fā)消息使用,本章僅介紹集成后的基礎(chǔ)用法,研究不深,請(qǐng)各位諒解。


        環(huán)境準(zhǔn)備

        IntelliJ IDEA

        前一章中搭建的微服務(wù)框架

        前一章之后,對(duì)目錄結(jié)構(gòu)進(jìn)行了優(yōu)化,將config相關(guān)類都放到demo.config包下

        開始集成

        pom.xml中增加依賴包

        <dependency>
         <groupId>org.springframework.kafka</groupId>
         <artifactId>spring-kafka</artifactId>
        </dependency


        加入依賴包后最好先執(zhí)行mvn clean install編一把,把所需依賴包下下來,后續(xù)寫代碼的時(shí)候直接就可以引了。


        application.yml中引入kafka相關(guān)配置

        spring:
          kafka:
            bootstrap-servers: 172.101.203.33:9092
            producer:
              # 發(fā)生錯(cuò)誤后,消息重發(fā)的次數(shù)。
              retries: 0
              #當(dāng)有多個(gè)消息需要被發(fā)送到同一個(gè)分區(qū)時(shí),生產(chǎn)者會(huì)把它們放在同一個(gè)批次里。該參數(shù)指定了一個(gè)批次可以使用的內(nèi)存大小,按照字節(jié)數(shù)計(jì)算。
              batch-size: 16384
              # 設(shè)置生產(chǎn)者內(nèi)存緩沖區(qū)的大小。
              buffer-memory: 33554432
              # 鍵的序列化方式
              key-serializer: org.apache.kafka.common.serialization.StringSerializer
              # 值的序列化方式
              value-serializer: org.apache.kafka.common.serialization.StringSerializer
              # acks=0 : 生產(chǎn)者在成功寫入消息之前不會(huì)等待任何來自服務(wù)器的響應(yīng)。
              # acks=1 : 只要集群的首領(lǐng)節(jié)點(diǎn)收到消息,生產(chǎn)者就會(huì)收到一個(gè)來自服務(wù)器成功響應(yīng)。
              # acks=all :只有當(dāng)所有參與復(fù)制的節(jié)點(diǎn)全部收到消息時(shí),生產(chǎn)者才會(huì)收到一個(gè)來自服務(wù)器的成功響應(yīng)。
              acks: 1
            consumer:
              # 自動(dòng)提交的時(shí)間間隔 在spring boot 2.X 版本中這里采用的是值的類型為Duration 需要符合特定的格式,如1S,1M,2H,5D
              auto-commit-interval: 1S
              # 該屬性指定了消費(fèi)者在讀取一個(gè)沒有偏移量的分區(qū)或者偏移量無效的情況下該作何處理:
              # latest(默認(rèn)值)在偏移量無效的情況下,消費(fèi)者將從最新的記錄開始讀取數(shù)據(jù)(在消費(fèi)者啟動(dòng)之后生成的記錄)
              # earliest :在偏移量無效的情況下,消費(fèi)者將從起始位置讀取分區(qū)的記錄
              auto-offset-reset: earliest
              # 是否自動(dòng)提交偏移量,默認(rèn)值是true,為了避免出現(xiàn)重復(fù)數(shù)據(jù)和數(shù)據(jù)丟失,可以把它設(shè)置為false,然后手動(dòng)提交偏移量
              enable-auto-commit: false
              # 鍵的反序列化方式
              key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
              # 值的反序列化方式
              value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            listener:
              # 在偵聽器容器中運(yùn)行的線程數(shù)。
              concurrency: 5
              #listner負(fù)責(zé)ack,每調(diào)用一次,就立即commit
              ack-mode: manual_immediate
              missing-topics-fatal: false


        該配置位于spring下,其中可以配置kafka server的IP:port,producer、consumer、listener的一些配置,可以參考中文注釋了解其作用


        開始寫代碼了:demo下新增kafka包,并在其下面新增producer和consumer


        package com.example.demo.kafka;

        import lombok.extern.slf4j.Slf4j;
        import org.apache.kafka.clients.consumer.ConsumerRecord;
        import org.springframework.kafka.annotation.KafkaListener;
        import org.springframework.kafka.support.Acknowledgment;
        import org.springframework.kafka.support.KafkaHeaders;
        import org.springframework.messaging.handler.annotation.Header;
        import org.springframework.stereotype.Component;

        import java.util.Optional;

        /**
         * 類功能描述:<br>
         * <ul>
         * <li>類功能描述1<br>
         * <li>類功能描述2<br>
         * <li>類功能描述3<br>
         * </ul>
         * 修改記錄:<br>
         * <ul>
         * <li>修改記錄描述1<br>
         * <li>修改記錄描述2<br>
         * <li>修改記錄描述3<br>
         * </ul>
         *
         * @author xuefl
         * @version 5.0 since 2020-01-13
         */
        @Component
        @Slf4j
        public class KafkaConsumer {

            @KafkaListener(topics = KafkaProducer.TOPIC_TEST, groupId = KafkaProducer.TOPIC_GROUP1)
            public void topic_test(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {

                Optional message = Optional.ofNullable(record.value());
                if (message.isPresent()) {
                    Object msg = message.get();
                    log.info("topic_test 消費(fèi)了:Topic:" + topic + ",Message:" + msg);
                    ack.acknowledge();
                }
            }

            @KafkaListener(topics = KafkaProducer.TOPIC_TEST, groupId = KafkaProducer.TOPIC_GROUP2)
            public void topic_test1(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {

                Optional message = Optional.ofNullable(record.value());
                if (message.isPresent()) {
                    Object msg = message.get();
                    log.info("topic_test1 消費(fèi)了:Topic:" + topic + ",Message:" + msg);
                    ack.acknowledge();
                }
            }
        }



        package com.example.demo.kafka;

        import com.alibaba.fastjson.JSONObject;
        import lombok.extern.slf4j.Slf4j;
        import org.springframework.beans.factory.annotation.Autowired;
        import org.springframework.kafka.core.KafkaTemplate;
        import org.springframework.kafka.support.SendResult;
        import org.springframework.stereotype.Component;
        import org.springframework.util.concurrent.ListenableFuture;
        import org.springframework.util.concurrent.ListenableFutureCallback;

        /**
         * 類功能描述:<br>
         * <ul>
         * <li>類功能描述1<br>
         * <li>類功能描述2<br>
         * <li>類功能描述3<br>
         * </ul>
         * 修改記錄:<br>
         * <ul>
         * <li>修改記錄描述1<br>
         * <li>修改記錄描述2<br>
         * <li>修改記錄描述3<br>
         * </ul>
         *
         * @author xuefl
         * @version 5.0 since 2020-01-13
         */
        @Component
        @Slf4j
        public class KafkaProducer {

            @Autowired
            private KafkaTemplate<String, Object> kafkaTemplate;

            //自定義topic
            public static final String TOPIC_TEST = "topic.test";

            //
            public static final String TOPIC_GROUP1 = "topic.group1";

            //
            public static final String TOPIC_GROUP2 = "topic.group2";

            public void send(Object obj) {
                String obj2String = JSONObject.toJSONString(obj);
                log.info("準(zhǔn)備發(fā)送消息為:{}", obj2String);
                //發(fā)送消息
                ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_TEST, obj);
                future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
                    @Override
                    public void onFailure(Throwable throwable) {
                        //發(fā)送失敗的處理
                        log.info(TOPIC_TEST + " - 生產(chǎn)者 發(fā)送消息失?。? + throwable.getMessage());
                    }

                    @Override
                    public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
                        //成功的處理
                        log.info(TOPIC_TEST + " - 生產(chǎn)者 發(fā)送消息成功:" + stringObjectSendResult.toString());
                    }
                });


            }
        }


        增加測(cè)試controller類,在controller下新建KafkaController類



        測(cè)試結(jié)果


        ————————————————

        版權(quán)聲明:本文為CSDN博主「念念不忘,笑對(duì)人生」的原創(chuàng)文章,遵循CC 4.0 BY-SA版權(quán)協(xié)議,轉(zhuǎn)載請(qǐng)附上原文出處鏈接及本聲明。

        原文鏈接:

        https://blog.csdn.net/qq_42715450/article/details/114293390





        鋒哥最新SpringCloud分布式電商秒殺課程發(fā)布

        ??????

        ??長(zhǎng)按上方微信二維碼 2 秒





        感謝點(diǎn)贊支持下哈 

        瀏覽 83
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            www.午夜 | 国产91对白刺激露脸在线观看 | 欧美一区二区三区成人片 | 激情五月天婷婷 | 樱花草国产18久久久久 | 国产男男在线观看 | 五月天综合熟妇视频在线观看 | 成人毛片18毛片免费播放 | 蘑菇视频logo | 国产精品美女www视频 |