1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        Kafka在SpringBoot中的實踐

        共 15342字,需瀏覽 31分鐘

         ·

        2021-05-23 22:14

        Kafka作為一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),目前已經(jīng)越來越被廣泛的應(yīng)用。這里介紹下如何在Spring Boot下集成、應(yīng)用

        abstract.png

        環(huán)境搭建

        我們使用Docker來進(jìn)行實踐,其中本機IP為192.168.2.101。其實這里還需要一個ZooKeeper實例用于保存元數(shù)據(jù),這里就不贅述ZooKeeper相關(guān)配置了

        # 拉取鏡像
        docker pull wurstmeister/kafka

        # 創(chuàng)建容器  
        docker run \
         -e KAFKA_BROKER_ID=1 \
         -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \ # KafKa監(jiān)聽端口
         -e KAFKA_ZOOKEEPER_CONNECT=192.168.2.101:2181/kafka \ # ZooKeeper地址、端口
         -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.2.101:9092 \ # 暴露給客戶端的地址、端口信息
         -d -p 9092:9092 \
         --name myKafka \
         wurstmeister/kafka

        由于wurstmeister/kafka鏡像使用Alpine Linux作為基礎(chǔ)鏡像環(huán)境,其使用的UTC時間與我們本地時間(北京時間)有8個小時的時差。故這里我們介紹下如何在Alpine Linux設(shè)置正確的時區(qū),利用docker exec命令進(jìn)入容器,然后進(jìn)行如下設(shè)置

        # 安裝 時區(qū)數(shù)據(jù)
        apk add tzdata
        # 查看 亞洲可用的時區(qū)
        ls /usr/share/zoneinfo/Asia
        # 復(fù)制 亞洲/上海 時區(qū) 到 /etc/localtime 下
        cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
        # 設(shè)置時區(qū)為 亞洲/上海
        echo "Asia/Shanghai" > /etc/timezone
        # 查看當(dāng)前時間,驗證是否生效
        date -R

        一切配置好了,現(xiàn)在我們通過命令腳本來驗證下看看Kafka是否可以正常工作

        生產(chǎn)者操作如下

        # 進(jìn)入容器
        docker exec -it myKafka /bin/bash
        # 切換目錄
        cd /opt/kafka/bin
        # 使用kafka-console-producer.sh腳本 生產(chǎn)消息
        ./kafka-console-producer.sh --broker-list localhost:9092 --topic test
        # 生產(chǎn)消息1
        one
        # 生產(chǎn)消息2
        hell world

        消費者操作如下

        # 進(jìn)入容器
        docker exec -it myKafka /bin/bash
        # 切換目錄
        cd /opt/kafka/bin
        # 使用kafka-console-consumer.sh腳本 消費消息
        ./kafka-console-consumer.sh --broker-list localhost:9092 --topic test

        效果如下所示,符合預(yù)期

        figure 1.jpeg

        SpringBoot集成Kafka

        依賴

        SpringBoot下使用Kafka很方便,直接添加依賴即可。

        <!-- Kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.6.4</version>
        </dependency>

        這里我們需要注意SpringBoot與spring-kafka之間的版本兼容性,具體地可以參考官網(wǎng)(https://spring.io/projects/spring-kafka ),下圖紅框、藍(lán)框分別為spring-kafka、SpringBoot的版本要求。這里,我們的SpringBoot版本為2.4.1

        figure 2.jpeg

        其實,關(guān)于spring-kafka版本的選擇問題還有一個小技巧,我們可以在POM依賴中不指定spring-kafka的版本信息,這樣其會自動選擇合適的版本

        配置

        在 application.properties 中添加相關(guān)的必要配置

        # Kafka
        # Kafka 地址、端口
        spring.kafka.bootstrap-servers=127.0.0.1:9092
        # 自定義Kafka分區(qū)器
        spring.kafka.producer.properties.partitioner.class=com.aaron.SpringBoot1.Kafka.KafkaPartitioner
        # 生產(chǎn)者 key、value的序列化器
        spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
        spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
        # 消費者 key、value的反序列化器
        spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
        spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

        實踐

        手動聲明一個Topic,并設(shè)置分區(qū)數(shù)為4

        @Configuration
        public class KafkaConfig {

            public static final String TOPIC_ALARM_IN = "topic_alarm_in";

            /**
             * 聲明Topic,設(shè)置其分區(qū)數(shù)為4
             * @return
             */

            @Bean
            public NewTopic topic1() {
                return TopicBuilder.name(TOPIC_ALARM_IN)
                        .partitions(4)
                        .build();
            }
        }

        在上文的配置中,我們定義了一個自定義的Kafka分區(qū)器。我們需要實現(xiàn)Partitioner接口,在partition方法中實現(xiàn)我們的分區(qū)邏輯。如下所示

        /**
         * 自定義Kafka分區(qū)器
         */

        public class KafkaPartitioner implements Partitioner {

            @Override
            public void configure(Map<String, ?> configs) {
            }

            @Override
            public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

                // 獲取該主題的分區(qū)數(shù)量
                int size = cluster.partitionsForTopic(topic).size();
                // 分區(qū)號
                int index = -1;
                switch ( (String) key) {
                    case "996":
                        index = 1;
                        break;
                    case "247":
                        index = 2;
                        break;
                    case "965":
                        index = 3;
                        break;
                    default:
                        index = 0;
                }

                return index;
            }

            @Override
            public void close() {
            }
        }

        消息的生產(chǎn)者就比較簡單了,我們直接使用kafkaTemplate發(fā)送即可,這里我們簡單地對其進(jìn)行了封裝

        @Component
        public class Producer {

            @Autowired
            private KafkaTemplate<String, String> kafkaTemplate;

            @Autowired
            private KafkaSendResultHandler kafkaSendResultHandler;

            public void sendMsg(String topic, String key, String value) {
                try{
                    // 設(shè)置消息發(fā)送結(jié)果的回調(diào)
                    kafkaTemplate.setProducerListener(kafkaSendResultHandler);
                    kafkaTemplate.send(topic, key, value);
                }catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

        其中,send方法會返回一個Future,可進(jìn)一步地通過get方法獲取發(fā)送結(jié)果。但我們既希望能夠了解消息發(fā)送是否成功,又不希望被阻塞。幸好Kafka提供了一個回調(diào)接口用于處理發(fā)送結(jié)果,KafkaSendResultHandler實現(xiàn)如下所示

        @Component
        public class KafkaSendResultHandler implements ProducerListener {

            @Override
            public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
                String info = "[發(fā)送成功]: ";
                String resultStr = buildResult(producerRecord);
                System.out.println( info + resultStr );
            }

            @Override
            public void onError(ProducerRecord producerRecord, RecordMetadata recordMetadata, Exception exception) {
                String info = "[發(fā)送失敗]: ";
                String resultStr = buildResult(producerRecord);
                System.out.println( info + resultStr );
                exception.printStackTrace();
                System.out.println();
            }

            private String buildResult(ProducerRecord<String, String> producerRecord) {
                String topic = producerRecord.topic();
                String key = producerRecord.key();
                String value = producerRecord.value();
                String str = " <topic>: " + topic + ", <key>: " + key + ", <value> :" + value;
                return str;
            }
        }

        這里為了簡便,使用一個Controller用于控制消息的發(fā)送,并將id作為key

        @RequiredArgsConstructor(onConstructor = @__(@Autowired))
        @Controller
        @ResponseBody
        @RequestMapping("Kafka")
        public class KafkaController {

            @Autowired
            private Producer producer;

            @RequestMapping("/saveAlarmIn")
            public String test1(@RequestBody AlarmIn alarmIn) {
                System.out.println("\n------------------------------------");
                String topic = TOPIC_ALARM_IN;
                String key = alarmIn.getId().toString();
                try {
                    String jsonStr = new ObjectMapper().writeValueAsString(alarmIn);
                    producer.sendMsg(topic, key, jsonStr);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return "OK";
            }

        }

        ...

        /**
         * 進(jìn)入告警
         */

        @Data
        @AllArgsConstructor
        @NoArgsConstructor
        @Builder
        public class AlarmIn {
            /**
             * ID
             */

            private Integer id;

            /**
             * 進(jìn)入告警的人員姓名
             */

            private String personName;

            /**
             * 進(jìn)入告警的區(qū)域名稱
             */

            private String areaName;

            /**
             * 告警級別
             */

            private Integer level;

        }

        通過@KafkaListener注解即可實現(xiàn)消息的監(jiān)聽消費,具體地可通過topics、groupId等屬性設(shè)置主題、消費者群組名等信息

        @Component
        public class Consumer {

            @KafkaListener(topics = TOPIC_ALARM_IN, groupId = "myGroup1" )
            public void g1c1(ConsumerRecord<String, String> record) {
                AlarmIn alarmIn = parseAlarmIn(record);
                int index = record.partition();
                System.out.println("[myGroup1] <c1>: alarmIn: " + alarmIn + ", partition: " + index);
            }

            @KafkaListener(topics = TOPIC_ALARM_IN, groupId = "myGroup1" )
            public void g1c2(ConsumerRecord<String, String> record) {
                AlarmIn alarmIn = parseAlarmIn(record);
                int index = record.partition();
                System.out.println("[myGroup1] <c2>: alarmIn: " + alarmIn + ", partition: " + index);
            }

            @KafkaListener(topics = TOPIC_ALARM_IN, groupId = "myGroup2" )
            public void g2c3(ConsumerRecord<String, String> record) {
                AlarmIn alarmIn = parseAlarmIn(record);
                int index = record.partition();
                System.out.println("[myGroup2] <c3>: alarmIn: " + alarmIn + ", partition: " + index);
            }

            /**
             * 解析進(jìn)入告警信息
             * @param record
             * @return
             */

            private AlarmIn parseAlarmIn(ConsumerRecord<String, String> record) {
                String key = record.key();
                String value = record.value();

                AlarmIn alarmIn = null;
                try {
                    alarmIn = new ObjectMapper().readValue(value, AlarmIn.class);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return alarmIn;
            }

        }

        測試結(jié)果如下,符合預(yù)期

        figure 3.jpeg

        Note

        1. Kafka將一個消費群組內(nèi)的所有消費者視為同一個整體,他們均訂閱同一個主題。一條消息只會被群組內(nèi)的一個消費者進(jìn)行消費。但消費者組之間不會相互影響,換言之,如果另外一個消費者組也訂閱了該主題,其同樣也會收到該消息并進(jìn)行處理。上文的測試結(jié)果也佐證了這一點

        2. Kafka中一個主題Topic雖然可以擁有多個分區(qū),但一個分區(qū)不能被同一個消費者群組下的多個消費者進(jìn)行消費。所以當(dāng) 某個消費者群組中消費者的數(shù)量 多于 其訂閱的主題Topic的分區(qū)數(shù) 時,該群組多出來的消費者只會被閑置、浪費

        figure 4.jpeg
        1. 上文我們自定義了一個Kafka的分區(qū)器,事實上這并不是必須的。一方面我們在發(fā)送時可以顯式地將消息發(fā)送到指定的分區(qū);另一方面,如果發(fā)送時未直接指定分區(qū),Kafka也會使用默認(rèn)的分區(qū)器進(jìn)行分區(qū)。具體地,如果key不為null, 默認(rèn)分區(qū)器則通過哈希計算來保證相同key鍵的消息可以被映射到同一個分區(qū)下;如果key為null,默認(rèn)分區(qū)器則使用輪詢算法將消息均衡地分布到各個分區(qū)

        參考文獻(xiàn)

        1. Kafka權(quán)威指南 Neha Narkhede/Gwen Shapira/Todd Palino著
        瀏覽 61
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            92看片淫黄大片看国产片图片 | 看污片网站 | 爱爱短视频电影无码免费 | 国产成人精品区一二三影院竹菊 | 亚洲爆乳无码精品AAA片蜜桃 | 午夜操屄视频 | 91免费三级片网站 | 91aiai| 蜜桃91精品入口 | 大美女大香蕉网页 |