分布式事務解決方案之RabbitMQ
點擊上方 Java學習之道,選擇 設為星標
作者: _沸羊羊_
來源: juejin.cn/post/7007040162676883486
Part1前言
之前介紹了分布式事務解決方案:2PC,3PC,TCC機制,今天主要介紹基于 RabbitMQ 的分布式事務解決方案,Let's go !
Part2分布式事務問題
以電商業(yè)務為例,訂單服務對應的訂單數(shù)據(jù)庫,庫存服務對應的庫存數(shù)據(jù)庫,多個數(shù)據(jù)源之間存在了分布式事務問題。如何保證在訂單生成后,正確的扣除庫存,或者在訂單生成失敗時,還原扣除的庫存,這就是分布式事務將要解決的問題。

解決分布式事務前的業(yè)務邏輯:
@Transaction(rollbackFor=Exception.class)
public void createOrder(OrderInfo orderInfo) throws Exception {
// 訂單系統(tǒng) —— 生成訂單
orderService.saveOrder(orderInfo);
// 庫存系統(tǒng) —— 扣除庫存
if(!"success".equals(reduceStock(orderInfo))){
throw new Exception("訂單創(chuàng)建失敗,原因:庫存扣除失敗!");
}
}
上述存在的問題:
庫存服務的接口調用成功,訂單數(shù)據(jù)庫事務提交失敗,庫存沒有回滾,導致訂單生成失敗但扣除了庫存 庫存服務的接口調用超時,訂單系統(tǒng)數(shù)據(jù)庫事務被回滾,庫存系統(tǒng)接口繼續(xù)執(zhí)行,導致訂單生成失敗但扣除了庫存
Part3解決方案

生產(chǎn)者:訂單系統(tǒng)將訂單信息的消息發(fā)送到 RabbitMQ 中,消息到達交換機后通過 confirm 機制將接收成功的消息返回給訂單系統(tǒng)(生產(chǎn)者),生產(chǎn)者收到正確的 confirm 后給用戶返回訂單創(chuàng)建成功的響應。
隊列:為了防止 RabbitMQ 在收到消息后還未發(fā)送給消費者處理就掛了的情況,將隊列和消息分別持久化。
消費者:消費者接收到消息后,為了確保消息能夠被成功正確的處理,需要考慮消息丟失問題和消息冪等性等問題。
可靠的發(fā)送消息
在訂單系統(tǒng)創(chuàng)建訂單的事務中添加把消息保存到訂單數(shù)據(jù)庫的業(yè)務邏輯,將消息在生產(chǎn)者中持久化。
public class OrderDBService {
public void createOrder(OrderInfo orderInfo) throws Exception {
// 創(chuàng)建訂單
String sql = "...";
int count = jdbcTemplate.insert(sql);
if(count != 1) {
throw new Exception("訂單創(chuàng)建失敗,原因:數(shù)據(jù)庫操作失敗!");
}
// 將消息保存到 MQ
saveMQMessage(orderInfo);
}
/**
* 將消息持久化,此時的消息狀態(tài)字段為:未發(fā)送
*/
public void saveMQMessage(OrderInfo orderInfo) throws Exception {
String sql = "...";
int count = jdbcTemplate.insert(sql);
if(count != 1) {
throw new Exception("消息記錄失敗,原因:數(shù)據(jù)庫操作失??!");
}
}
}
創(chuàng)建訂單成功后,將訂單消息發(fā)送到 MQ 中,開啟發(fā)布確定機制;生產(chǎn)者收到確認后在回調方法中更新本地消息表的消息狀態(tài)為:已發(fā)送。
spring:
rabbitmq:
......
# 開啟 confirm 模式
publisher-confirm-type: correlated
@Component
@Slf4j
public class MQService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(OrderInfo orderInfo) {
// 發(fā)送消息到 MQ
rabbitTemplate.convertAndSend("directEx", "", orderInfo);
// 回調
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("消息發(fā)送成功!");
} else {
log.info("消息發(fā)送失?。?);
}
});
}
}
消息的持久化
將隊列聲明為持久化
@Bean
public Queue getRoutingQueue() {
// 參數(shù):queueName,durable,exclusive,autoDelete
return new Queue("routingQueue", true, false, true);
}
將消息聲明為持久化
public void send(String msg) {
// 持久化消息
Message message = MessageBuilder.withBody(msg.getBytes()).build();
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
// 發(fā)送消息到 MQ
rabbitTemplate.convertAndSend("directEx", "", message);
// 回調
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("消息發(fā)送成功!");
} else {
log.info("消息發(fā)送失??!");
}
});
}
可靠的消費消息
保證消費者可靠消費有兩個方面,消息丟失和消息冪等性問題。
使用手動ACK機制解決消息丟失問題。
spring:
rabbitmq:
listener:
simple:
# 開啟手動ACK
acknowledge-mode: manual
@Component
@Slf4j
@RabbitListener(queues = "routingQueue")
public class Consumer {
@RabbitHandler
public void process(String msg, Message message, Channel channel) throws IOException {
try {
log.info("consumer received msg : " + msg);
// 手動ACK
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
log.info("消息入隊失敗");
// 重新入隊
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
配置 RabbitMQ 的 retry 機制,當消費者出現(xiàn)發(fā)送 NACK或異常時,會根據(jù)配置的 retry 機制進行重試。
解決消息的冪等性問題可以給消息添加 messageID,并且消費者消費成功后放入將 messageID 放入 redis 中。
// 生產(chǎn)者發(fā)送消息前設置 messageID
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
@Component
@Slf4j
@RabbitListener(queues = "routingQueue")
public class Consumer {
@Autowired
private RedisTemplate redisTemplate;
@RabbitHandler
public void process(String msg, Message message, Channel channel) throws IOException {
// redis 的 setnx 命令,如果 messageID 已存在代表此消息已被消費過
if (!redisTemplate.opsForValue().setIfAbsent(message.getMessageProperties().getMessageId(), msg)) {
return;
}
try {
log.info("consumer received msg : " + msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
System.out.println("消息入隊失敗");
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
綜上,使用手動 ACK 和 retry 機制解決了消息丟失的問題,使用 messageID 存到 redis 解決了消息冪等性問題。
Part4總結

-
| 更多精彩文章 -
▽加我微信,交個朋友 長按/掃碼添加↑↑↑



