celery-mq-assistantMQ 助手
MQ助手 - 是一個基于pulsar包自研實現(xiàn)的Spring Boot Stater。通過簡單注解配置即可進行MQ消息生產(chǎn)與消費。
主要特性
- 基于成熟pulsar包擴展,沒有任何框架變動,只為簡化開發(fā)使用
- 配置簡單靈活,無需復(fù)雜的配置文件:開發(fā)者可以快速注解類或者方法實現(xiàn)消息的生產(chǎn)與消費
- Apache Pulsar 云原生分布式消息流平臺,當下最佳解決方案
使用指引
引入依賴
implementation("cool.doudou:mq-assistant:latest")
Pulsar配置
pulsar:
service-url: pulsar://127.0.0.1:6650
subscription-name: sub-celery
subscription-type: Shared
使用方式
消息訂閱
- 生產(chǎn)者與topic進行關(guān)聯(lián)綁定
/**
* 生產(chǎn)者主題綁定
*/
@MqProducer(topics = {"celery"})
@Component
public class MqComponent {
}
- 消費者與topic進行關(guān)聯(lián)綁定,注意:每個消費者須綁定一個subscription-name后才能進行消費
/**
* 消費者主題綁定
*/
@Component
public class MqComponent {
@MqConsumer(topics = {"celery"})
public void receive(String topic, byte[] msg) {
System.out.println("consumer: topic[" + topic + "] => " + new String(msg));
}
}
消息發(fā)送
- send():發(fā)送
- sendAsync():異步發(fā)送
/**
* 消息發(fā)送
*/
@AllArgsConstructor
@Service
public class MqServiceImpl {
private MqHelper mqHelper;
public void test() {
// 同步
String msgId = mqHelper.send("celery", "hello");
System.out.println("send: " + msgId);
// 異步
mqHelper.sendAsync("celery", "您好Async", System.out::println);
// 同步
String msgId = mqHelper.send("celery", new byte[]{0x01, 0x02, 0x03, 0x04});
System.out.println("send: " + msgId);
// 異步
mqHelper.sendAsync("celery", new byte[]{0x01, 0x02, 0x03, 0x04}, System.out::println);
}
}
評論
圖片
表情
