Kafka在SpringBoot中的實踐
Kafka作為一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),目前已經(jīng)越來越被廣泛的應(yīng)用。這里介紹下如何在Spring Boot下集成、應(yīng)用

環(huán)境搭建
我們使用Docker來進(jìn)行實踐,其中本機IP為192.168.2.101。其實這里還需要一個ZooKeeper實例用于保存元數(shù)據(jù),這里就不贅述ZooKeeper相關(guān)配置了
# 拉取鏡像
docker pull wurstmeister/kafka
# 創(chuàng)建容器
docker run \
-e KAFKA_BROKER_ID=1 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \ # KafKa監(jiān)聽端口
-e KAFKA_ZOOKEEPER_CONNECT=192.168.2.101:2181/kafka \ # ZooKeeper地址、端口
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.2.101:9092 \ # 暴露給客戶端的地址、端口信息
-d -p 9092:9092 \
--name myKafka \
wurstmeister/kafka
由于wurstmeister/kafka鏡像使用Alpine Linux作為基礎(chǔ)鏡像環(huán)境,其使用的UTC時間與我們本地時間(北京時間)有8個小時的時差。故這里我們介紹下如何在Alpine Linux設(shè)置正確的時區(qū),利用docker exec命令進(jìn)入容器,然后進(jìn)行如下設(shè)置
# 安裝 時區(qū)數(shù)據(jù)
apk add tzdata
# 查看 亞洲可用的時區(qū)
ls /usr/share/zoneinfo/Asia
# 復(fù)制 亞洲/上海 時區(qū) 到 /etc/localtime 下
cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
# 設(shè)置時區(qū)為 亞洲/上海
echo "Asia/Shanghai" > /etc/timezone
# 查看當(dāng)前時間,驗證是否生效
date -R
一切配置好了,現(xiàn)在我們通過命令腳本來驗證下看看Kafka是否可以正常工作
生產(chǎn)者操作如下
# 進(jìn)入容器
docker exec -it myKafka /bin/bash
# 切換目錄
cd /opt/kafka/bin
# 使用kafka-console-producer.sh腳本 生產(chǎn)消息
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
# 生產(chǎn)消息1
one
# 生產(chǎn)消息2
hell world
消費者操作如下
# 進(jìn)入容器
docker exec -it myKafka /bin/bash
# 切換目錄
cd /opt/kafka/bin
# 使用kafka-console-consumer.sh腳本 消費消息
./kafka-console-consumer.sh --broker-list localhost:9092 --topic test
效果如下所示,符合預(yù)期

SpringBoot集成Kafka
依賴
SpringBoot下使用Kafka很方便,直接添加依賴即可。
<!-- Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.6.4</version>
</dependency>
這里我們需要注意SpringBoot與spring-kafka之間的版本兼容性,具體地可以參考官網(wǎng)(https://spring.io/projects/spring-kafka ),下圖紅框、藍(lán)框分別為spring-kafka、SpringBoot的版本要求。這里,我們的SpringBoot版本為2.4.1

其實,關(guān)于spring-kafka版本的選擇問題還有一個小技巧,我們可以在POM依賴中不指定spring-kafka的版本信息,這樣其會自動選擇合適的版本
配置
在 application.properties 中添加相關(guān)的必要配置
# Kafka
# Kafka 地址、端口
spring.kafka.bootstrap-servers=127.0.0.1:9092
# 自定義Kafka分區(qū)器
spring.kafka.producer.properties.partitioner.class=com.aaron.SpringBoot1.Kafka.KafkaPartitioner
# 生產(chǎn)者 key、value的序列化器
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 消費者 key、value的反序列化器
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
實踐
手動聲明一個Topic,并設(shè)置分區(qū)數(shù)為4
@Configuration
public class KafkaConfig {
public static final String TOPIC_ALARM_IN = "topic_alarm_in";
/**
* 聲明Topic,設(shè)置其分區(qū)數(shù)為4
* @return
*/
@Bean
public NewTopic topic1() {
return TopicBuilder.name(TOPIC_ALARM_IN)
.partitions(4)
.build();
}
}
在上文的配置中,我們定義了一個自定義的Kafka分區(qū)器。我們需要實現(xiàn)Partitioner接口,在partition方法中實現(xiàn)我們的分區(qū)邏輯。如下所示
/**
* 自定義Kafka分區(qū)器
*/
public class KafkaPartitioner implements Partitioner {
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 獲取該主題的分區(qū)數(shù)量
int size = cluster.partitionsForTopic(topic).size();
// 分區(qū)號
int index = -1;
switch ( (String) key) {
case "996":
index = 1;
break;
case "247":
index = 2;
break;
case "965":
index = 3;
break;
default:
index = 0;
}
return index;
}
@Override
public void close() {
}
}
消息的生產(chǎn)者就比較簡單了,我們直接使用kafkaTemplate發(fā)送即可,這里我們簡單地對其進(jìn)行了封裝
@Component
public class Producer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private KafkaSendResultHandler kafkaSendResultHandler;
public void sendMsg(String topic, String key, String value) {
try{
// 設(shè)置消息發(fā)送結(jié)果的回調(diào)
kafkaTemplate.setProducerListener(kafkaSendResultHandler);
kafkaTemplate.send(topic, key, value);
}catch (Exception e) {
e.printStackTrace();
}
}
}
其中,send方法會返回一個Future,可進(jìn)一步地通過get方法獲取發(fā)送結(jié)果。但我們既希望能夠了解消息發(fā)送是否成功,又不希望被阻塞。幸好Kafka提供了一個回調(diào)接口用于處理發(fā)送結(jié)果,KafkaSendResultHandler實現(xiàn)如下所示
@Component
public class KafkaSendResultHandler implements ProducerListener {
@Override
public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
String info = "[發(fā)送成功]: ";
String resultStr = buildResult(producerRecord);
System.out.println( info + resultStr );
}
@Override
public void onError(ProducerRecord producerRecord, RecordMetadata recordMetadata, Exception exception) {
String info = "[發(fā)送失敗]: ";
String resultStr = buildResult(producerRecord);
System.out.println( info + resultStr );
exception.printStackTrace();
System.out.println();
}
private String buildResult(ProducerRecord<String, String> producerRecord) {
String topic = producerRecord.topic();
String key = producerRecord.key();
String value = producerRecord.value();
String str = " <topic>: " + topic + ", <key>: " + key + ", <value> :" + value;
return str;
}
}
這里為了簡便,使用一個Controller用于控制消息的發(fā)送,并將id作為key
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
@Controller
@ResponseBody
@RequestMapping("Kafka")
public class KafkaController {
@Autowired
private Producer producer;
@RequestMapping("/saveAlarmIn")
public String test1(@RequestBody AlarmIn alarmIn) {
System.out.println("\n------------------------------------");
String topic = TOPIC_ALARM_IN;
String key = alarmIn.getId().toString();
try {
String jsonStr = new ObjectMapper().writeValueAsString(alarmIn);
producer.sendMsg(topic, key, jsonStr);
} catch (Exception e) {
e.printStackTrace();
}
return "OK";
}
}
...
/**
* 進(jìn)入告警
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class AlarmIn {
/**
* ID
*/
private Integer id;
/**
* 進(jìn)入告警的人員姓名
*/
private String personName;
/**
* 進(jìn)入告警的區(qū)域名稱
*/
private String areaName;
/**
* 告警級別
*/
private Integer level;
}
通過@KafkaListener注解即可實現(xiàn)消息的監(jiān)聽消費,具體地可通過topics、groupId等屬性設(shè)置主題、消費者群組名等信息
@Component
public class Consumer {
@KafkaListener(topics = TOPIC_ALARM_IN, groupId = "myGroup1" )
public void g1c1(ConsumerRecord<String, String> record) {
AlarmIn alarmIn = parseAlarmIn(record);
int index = record.partition();
System.out.println("[myGroup1] <c1>: alarmIn: " + alarmIn + ", partition: " + index);
}
@KafkaListener(topics = TOPIC_ALARM_IN, groupId = "myGroup1" )
public void g1c2(ConsumerRecord<String, String> record) {
AlarmIn alarmIn = parseAlarmIn(record);
int index = record.partition();
System.out.println("[myGroup1] <c2>: alarmIn: " + alarmIn + ", partition: " + index);
}
@KafkaListener(topics = TOPIC_ALARM_IN, groupId = "myGroup2" )
public void g2c3(ConsumerRecord<String, String> record) {
AlarmIn alarmIn = parseAlarmIn(record);
int index = record.partition();
System.out.println("[myGroup2] <c3>: alarmIn: " + alarmIn + ", partition: " + index);
}
/**
* 解析進(jìn)入告警信息
* @param record
* @return
*/
private AlarmIn parseAlarmIn(ConsumerRecord<String, String> record) {
String key = record.key();
String value = record.value();
AlarmIn alarmIn = null;
try {
alarmIn = new ObjectMapper().readValue(value, AlarmIn.class);
} catch (Exception e) {
e.printStackTrace();
}
return alarmIn;
}
}
測試結(jié)果如下,符合預(yù)期

Note
Kafka將一個消費群組內(nèi)的所有消費者視為同一個整體,他們均訂閱同一個主題。一條消息只會被群組內(nèi)的一個消費者進(jìn)行消費。但消費者組之間不會相互影響,換言之,如果另外一個消費者組也訂閱了該主題,其同樣也會收到該消息并進(jìn)行處理。上文的測試結(jié)果也佐證了這一點
Kafka中一個主題Topic雖然可以擁有多個分區(qū),但一個分區(qū)不能被同一個消費者群組下的多個消費者進(jìn)行消費。所以當(dāng) 某個消費者群組中消費者的數(shù)量 多于 其訂閱的主題Topic的分區(qū)數(shù) 時,該群組多出來的消費者只會被閑置、浪費

上文我們自定義了一個Kafka的分區(qū)器,事實上這并不是必須的。一方面我們在發(fā)送時可以顯式地將消息發(fā)送到指定的分區(qū);另一方面,如果發(fā)送時未直接指定分區(qū),Kafka也會使用默認(rèn)的分區(qū)器進(jìn)行分區(qū)。具體地,如果key不為null, 默認(rèn)分區(qū)器則通過哈希計算來保證相同key鍵的消息可以被映射到同一個分區(qū)下;如果key為null,默認(rèn)分區(qū)器則使用輪詢算法將消息均衡地分布到各個分區(qū)
參考文獻(xiàn)
Kafka權(quán)威指南 Neha Narkhede/Gwen Shapira/Todd Palino著
