SpringCloud下基于Seata TCC的分布式事務(wù)實(shí)踐
Seata是Spring Cloud Alibaba中一款開源的分布式事務(wù)解決方案,本文具體就Seata的TCC 模式進(jìn)行介紹、實(shí)踐

環(huán)境配置
基礎(chǔ)環(huán)境
首先通過Docker Compose搭建基礎(chǔ)環(huán)境——Nacos、MySQL服務(wù),具體如下
#?Compose?版本
version:?'3.8'
#?定義Docker服務(wù)
services:
??#?Nacos?服務(wù)
??Nacos-Service-1:
????image:?nacos/nacos-server:1.4.2
????container_name:?Nacos-Service-1
????ports:
??????-?"9848:8848"
????environment:
??????MODE:?standalone
????networks:
??????seata_tcc_net:
????????ipv4_address:?130.130.130.32
??#?MySQL?服務(wù)?(用于PayService)
??PayService-DB:
????image:?mysql:5.7
????container_name:?PayService-DB
????ports:
??????-?"9306:3306"
????environment:
??????MYSQL_ROOT_PASSWORD:?12345
????networks:
??????seata_tcc_net:
????????ipv4_address:?130.130.130.36
??#?MySQL?服務(wù)?(用于StorageService)
??StorageService-DB:
????image:?mysql:5.7
????container_name:?StorageService-DB
????ports:
??????-?"9307:3306"
????environment:
??????MYSQL_ROOT_PASSWORD:?12345
????networks:
??????seata_tcc_net:
????????ipv4_address:?130.130.130.37
#?定義網(wǎng)絡(luò)
networks:
??seata_tcc_net:
????ipam:
??????config:
????????-?subnet:?130.130.130.0/24
配置Seata Server
通過Github下載Seata Server,命令如下
wget?https://github.com/seata/seata/releases/download/v1.3.0/seata-server-1.3.0.zip
修改Seata Server下conf目錄的registry.conf文件,將注冊中心、配置中心均設(shè)置Nacos。需要注意的是如果沒有l(wèi)ogs目錄,則需要手動創(chuàng)建該目錄

對于Seata Server而言,其配置信息支持兩種形式:本地文件、配置中心。對于后者而言,我們需要將Seata的相關(guān)配置項(xiàng)導(dǎo)入到配置中心。同樣,我們需要通過Github來下載配置文件config.txt及相應(yīng)的導(dǎo)入腳本nacos-config.sh
#?下載地址:?配置中心的配置項(xiàng)
https://github.com/seata/seata/blob/1.3.0/script/config-center/config.txt
#?下載地址:?用于將配置項(xiàng)導(dǎo)入至Nacos的腳本
https://github.com/seata/seata/blob/1.3.0/script/config-center/nacos/nacos-config.sh
在通過Shell腳本導(dǎo)入配置至Nacos過程中,配置文件config.txt應(yīng)與Shell腳本的上一級目錄保持平行。然后在Shell腳本所在目錄中執(zhí)行如下命令即可
#?執(zhí)行Shell腳本
sh?nacos-config.sh?-h?localhost?-p?9848
該Shell腳本支持的選項(xiàng)如下所示
-h: Nacos服務(wù)的IP地址,默認(rèn)為localhost -p: Nacos服務(wù)的Port端口,默認(rèn)為8848 -g: Nacos分組名,默認(rèn)為SEATA_GROUP -t: Nacos命名空間ID。默認(rèn)為"",即使用public命名空間 -u: Nacos服務(wù)的用戶名 -w: Nacos服務(wù)的密碼
效果如下所示

至此Seata Server相關(guān)環(huán)境及配置就完成了,只需通過Seata Server下bin目錄的seata-server.sh腳本啟動服務(wù)即可。其中-p選項(xiàng)指定服務(wù)使用的端口,默認(rèn)為8091

搭建庫存服務(wù)
POM依賴
通過SpringBoot搭建庫存服務(wù)StorageService。這里給出關(guān)鍵性的依賴及版本,如下所示
<dependencyManagement>
??<dependencies>
??
????
????<dependency>
??????<groupId>org.springframework.bootgroupId>
??????<artifactId>spring-boot-dependenciesartifactId>
??????<version>2.3.2.RELEASEversion>
??????<type>pomtype>
??????<scope>importscope>
????dependency>
??
????
????<dependency>
??????<groupId>org.springframework.cloudgroupId>
??????<artifactId>spring-cloud-dependenciesartifactId>
??????<version>Hoxton.SR8version>
??????<type>pomtype>
??????<scope>importscope>
????dependency>
??
????
????<dependency>
??????<groupId>com.alibaba.cloudgroupId>
??????<artifactId>spring-cloud-alibaba-dependenciesartifactId>
??????<version>2.2.3.RELEASEversion>
??????<type>pomtype>
??????<scope>importscope>
????dependency>
??dependencies>
dependencyManagement>
<dependencies>
??
??<dependency>
????<groupId>com.alibaba.cloudgroupId>
????<artifactId>spring-cloud-starter-alibaba-seataartifactId>
????<exclusions>
??????<exclusion>
????????<groupId>io.seatagroupId>
????????<artifactId>seata-spring-boot-starterartifactId>
??????exclusion>
????exclusions>
??dependency>
??
??<dependency>
????<groupId>io.seatagroupId>
????<artifactId>seata-spring-boot-starterartifactId>
????<version>1.3.0version>
??dependency>
??
??<dependency>
????<groupId>com.alibaba.cloudgroupId>
????<artifactId>spring-cloud-starter-alibaba-nacos-discoveryartifactId>
??dependency>
??
??<dependency>
????<groupId>com.alibabagroupId>
????<artifactId>fastjsonartifactId>
????<version>1.2.76version>
??dependency>
??
??<dependency>
????<groupId>com.baomidougroupId>
????<artifactId>mybatis-plus-boot-starterartifactId>
????<version>3.4.1version>
??dependency>
??
dependencies>
服務(wù)配置
該服務(wù)的配置文件如下所示
server:
??port:?8080
spring:
??application:
????name:?StorageService
??datasource:
????type:?com.alibaba.druid.pool.DruidDataSource
????driver-class-name:?com.mysql.jdbc.Driver
????url:?jdbc:mysql://localhost:9307/StorageDb?allowPublicKeyRetrieval=true&useSSL=false
????username:?root
????password:?12345
??cloud:
????nacos:
??????discovery:
????????#?注冊中心?Nacos?地址信息
????????server-addr:?127.0.0.1:9848
????alibaba:
??????seata:
????????#?配置使用的事務(wù)分組名稱
????????tx-service-group:?my_test_tx_group
#?Mybatis-Plus?配置
mybatis-plus:
??mapper-locations:?classpath:mapper/*.xml
#?Seata?Server配置
seata:
??#?Seata服務(wù)端所在注冊中心的配置信息
??registry:
????#?注冊中心類型
????type:?nacos
????nacos:
??????#?Seata服務(wù)端的服務(wù)名
??????application:?seata-server
??????#?Seata服務(wù)端所在的注冊中心信息
??????server-addr:?127.0.0.1:9848
??????username:?nacos
??????password:?nacos
??????group:?SEATA_GROUP
??#?Seata服務(wù)端所在配置中心的配置信息
??config:
????type:?nacos
????nacos:
??????#?Seata服務(wù)端所在的配置中心信息
??????server-addr:?127.0.0.1:9848
??????username:?nacos
??????password:?nacos
??????group:?SEATA_GROUP
??#?使能Seata自動代理數(shù)據(jù)源
??enable-auto-data-source-proxy:?true
#?Actuator配置:?開啟所有端點(diǎn)
management:
??endpoints:
????web:
??????exposure:
????????include:?"*"
??????base-path:?/actuator
服務(wù)實(shí)現(xiàn)
這里直接添加一個(gè)Controller類用以實(shí)現(xiàn)庫存扣減,核心代碼實(shí)現(xiàn)如下
@RestController
@RequestMapping("goods")
@Slf4j
public?class?GoodsController?{
????@Autowired
????private?GoodsStorageService?goodsStorageService;
????@RequestMapping("/sell")
????public?String?sell(@RequestBody?GoodsDto?goodsDto)?{
????????String?msg?=?"success";
????????goodsStorageService.sell(null,?goodsDto);
????????return?msg;
????}
}
...
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public?class?GoodsDto?{
????/**
?????*?商品名
?????*/
????private?String?goodsName;
????/**
?????*?數(shù)量
?????*/
????private?int?num;
}
作為TCC的分布式事務(wù)方案來說,對于一個(gè)售賣商品扣減庫存的過程。需要根據(jù)Try-Confirm-Cancel的設(shè)計(jì)要求,將庫存的扣減分為兩階段完成。首先在GoodsStorageService接口中定義sell、confirmSell、cancelSell方法,然后在接口上添加@LocalTCC注解,最后在Try方法上添加@TwoPhaseBusinessAction注解。具體地,@TwoPhaseBusinessAction注解的name屬性只需保證唯一性即可、commitMethod/rollbackMethod屬性用來設(shè)置兩階段調(diào)用的方法名。TCC中各方法的BusinessActionContext參數(shù)是TCC兩階段之間用來傳遞參數(shù)的context上下文,故在Controller中調(diào)用Try方法時(shí)BusinessActionContext參數(shù)只需傳null值即可。并將通過@BusinessActionContextParameter注解將相關(guān)參數(shù)以指定名稱存入context上下文。與此同時(shí)還在接口中提供了一個(gè)默認(rèn)方法getParamByContext,以便于二階段時(shí)從context上下文獲取參數(shù)。為了保證TCC二階段的Confirm、Cancel接口的冪等性,這里在實(shí)現(xiàn)類中通過向resultHolder存入xid全局事務(wù)ID進(jìn)行冪等控制
@LocalTCC
public?interface?GoodsStorageService?{
????/**
?????*?定義context中參數(shù)名
?????*/
????String?paramName?=?"params";
????/**
?????*?從context中獲取指定參數(shù)名所對應(yīng)的值
?????*?@param?context
?????*?@return
?????*/
????default?GoodsDto?getParamByContext(BusinessActionContext?context)?{
????????JSONObject?jsonObject?=?(JSONObject)?context.getActionContext(paramName);
????????GoodsDto?goodsDto?=?jsonObject.toJavaObject(GoodsDto.class);
????????return?goodsDto;
????}
????/**
?????*?Try方法:?售賣
?????*?@param?goodsDto
?????*?@return
?????*/
????@TwoPhaseBusinessAction(name?=?"sell",?commitMethod?=?"confirmSell",?rollbackMethod?=?"cancelSell")
????int?sell(BusinessActionContext?context,
????????@BusinessActionContextParameter(paramName?=?paramName)?GoodsDto?goodsDto);
????/**
?????*?Confirm方法:?確認(rèn)售賣
?????*?@param?context
?????*?@return
?????*/
????void?confirmSell(BusinessActionContext?context);
????/**
?????*?Cancel方法:?取消售賣
?????*?@param?context
?????*?@return
?????*/
????void?cancelSell(BusinessActionContext?context);
}
...
@Service
@Slf4j
public?class?GoodsStorageServiceImpl?implements?GoodsStorageService?{
????private?static?Set?resultHolder?=?new?ConcurrentHashSet<>();
????@Autowired
????private?GoodsStorageMapper?goodsStorageMapper;
????@Override
????public?int?sell(BusinessActionContext?context,?GoodsDto?goodsDto)?{
????????//?獲取全局事務(wù)ID
????????String?xid?=?context.getXid();
????????int?result?=?goodsStorageMapper.sell(goodsDto);
????????log.info("[Goods?Storage?Service]:?result:?{}",?result);
????????if(?result?!=?1?)?{
????????????throw?new?RuntimeException("商品庫存不足");
????????}
????????resultHolder.add(?xid?);
????????return?result;
????}
????@Override
????public?void?confirmSell(BusinessActionContext?context)?{
????????//?獲取全局事務(wù)ID
????????String?xid?=?context.getXid();
????????//?冪等設(shè)計(jì):?防止重復(fù)提交
????????if(?!resultHolder.contains(xid)?)?{
????????????return;
????????}
????????GoodsDto?goodsDto?=?getParamByContext(context);
????????goodsStorageMapper.confirmSell(goodsDto);
????????resultHolder.remove(xid);
????????log.info("[Goods?Storage?Service]:?confirm?sell");
????}
????@Override
????public?void?cancelSell(BusinessActionContext?context)?{
????????//?獲取全局事務(wù)ID
????????String?xid?=?context.getXid();
????????//?1.?冪等設(shè)計(jì):?防止重復(fù)回滾;?2.?實(shí)現(xiàn)空回滾
????????if(?!resultHolder.contains(xid)?)?{
????????????return;
????????}
????????GoodsDto?goodsDto?=?getParamByContext(context);
????????goodsStorageMapper.cancelSell(goodsDto);
????????resultHolder.remove(xid);
????????log.info("[Goods?Storage?Service]:?cancel?sell");
????}
}
DB層面
商品庫存表對應(yīng)的實(shí)體類GoodsStorage如下所示
/**
?*?商品庫存
?*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@TableName("goods_storage")
public?class?GoodsStorage?{
????@TableId
????private?int?id;
????/**
?????*?商品名
?????*/
????private?String?goodsName;
????/**
?????*?可用庫存數(shù)
?????*/
????private?int?stock;
????/**
?????*?售出數(shù)
?????*/
????private?int?soldNum;
????/**
?????*?凍結(jié)庫存數(shù)
?????*/
????private?int?freezeNum;
}
而售賣商品扣減庫存的各階段方法所使用的SQL如下所示,至此就可以明白freezeNum凍結(jié)庫存數(shù)這一中間狀態(tài)的含義。這也是TCC方案兩階段的具體體現(xiàn)
<update?id="sell"?parameterType="com.aaron.StorageService.dto.GoodsDto">
????update?goods_storage
????set?stock?=?stock?-?#{num},?freeze_num?=?freeze_num?+?#{num}
????where?goods_name?=?#{goodsName}
????and?(stock?-?#{num})?>=?0
update>
<update?id="confirmSell"?parameterType="com.aaron.StorageService.dto.GoodsDto">
????update?goods_storage
????set?sold_num?=?sold_num?+?#{num},?freeze_num?=?freeze_num?-?#{num}
????where?goods_name?=?#{goodsName}
update>
<update?id="cancelSell"?parameterType="com.aaron.StorageService.dto.GoodsDto">
????update?goods_storage
????set?stock?=?stock?+?#{num},?freeze_num?=?freeze_num?-?#{num}
????where?goods_name?=?#{goodsName}
update>
搭建支付服務(wù)
為了驗(yàn)證分布式事務(wù),自然不能只有一個(gè)微服務(wù)。故這里類似地我們再搭建一個(gè)PayService支付服務(wù)。當(dāng)然基本搭建過程與StorageService服務(wù)并無明顯差異。首先在POM依賴方面,PayService服務(wù)的POM依賴與StorageService服務(wù)一致,同樣也需要引入Seata、Nacos等相關(guān)依賴。其次在服務(wù)配置方面,PayService服務(wù)的application.yml配置文件中關(guān)于Seata、Nacos相關(guān)的配置自然與StorageService服務(wù)并無二致。但需調(diào)整修改其所連接的數(shù)據(jù)庫信息,部分配置如下所示
server:
??port:?90
spring:
??application:
????name:?PayService
??datasource:
????type:?com.alibaba.druid.pool.DruidDataSource
????driver-class-name:?com.mysql.jdbc.Driver
????url:?jdbc:mysql://localhost:9306/PayDb?allowPublicKeyRetrieval=true&useSSL=false
????username:?root
????password:?12345
這里添加一個(gè)Controller用于進(jìn)行余額、庫存的扣減。作為分布式事務(wù)的發(fā)起者,這里需要添加一個(gè)@GlobalTransactional注解
@RestController
@RequestMapping("pay")
@Slf4j
public?class?PayController?{
????@Autowired
????private?RestTemplate?restTemplate;
????@Autowired
????private?PayService?payService;
????@GlobalTransactional
????@RequestMapping("/buy")
????public?String?buy()?{
????????//?1.?扣余額
????????PayDto?payDto?=?new?PayDto("Aaron",2000);
????????payService.pay(null,?payDto);
????????//?2.?扣庫存
????????String?url?=?"http://StorageService/goods/sell";
????????GoodsDto?goodsDto?=?new?GoodsDto("iPhone",?5);
????????String?response?=?restTemplate.postForObject(url,?goodsDto,?String.class);
????????return?"complete";
????}
}
...
@Configuration
public?class?RestTemplateConfig?{
????@Bean
????@LoadBalanced
????public?RestTemplate?restTemplate()?{
????????return?new?RestTemplate();
????}
}
...
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public?class?PayDto?{
????/**
?????*?姓名
?????*/
????private?String?name;
????/**
?????*?金額
?????*/
????private?int?money;
}
類似地,Service層同樣按照TCC的設(shè)計(jì)原則進(jìn)行設(shè)計(jì)
@LocalTCC
public?interface?PayService?{
????/**
?????*?定義context中參數(shù)名
?????*/
????String?paramName?=?"params";
????/**
?????*?從context中獲取指定參數(shù)名所對應(yīng)的值
?????*?@param?context
?????*?@return
?????*/
????default?PayDto?getParamByContext(BusinessActionContext?context)?{
????????JSONObject?jsonObject?=?(JSONObject)?context.getActionContext(paramName);
????????PayDto?payDto?=?jsonObject.toJavaObject(PayDto.class);
????????return?payDto;
????}
????/**
?????*?會員進(jìn)行支付
?????*?@param?payDto
?????*?@return
?????*/
????@TwoPhaseBusinessAction(name?=?"pay",?commitMethod?=?"confirmPay",?rollbackMethod?=?"cancelPay")
????int?pay(BusinessActionContext?context,
????????@BusinessActionContextParameter(paramName?=?paramName)?PayDto?payDto?);
????/**
?????*?確認(rèn)支付
?????*?@param?context
?????*?@return
?????*/
????void?confirmPay(BusinessActionContext?context);
????/**
?????*?取消支付
?????*?@param?context
?????*?@return
?????*/
????void?cancelPay(BusinessActionContext?context);
}
...
@Service
@Slf4j
public?class?PayServiceImpl?implements?PayService?{
????private?static?Set?resultHolder?=?new?ConcurrentHashSet<>();
????@Autowired
????private?PayMapper?payMapper;
????@Override
????public?int?pay(BusinessActionContext?context,?PayDto?payDto)?{
????????//?獲取全局事務(wù)ID
????????String?xid?=?context.getXid();
????????int?result?=?payMapper.pay(payDto);
????????log.info("[Pay?Service]:?result:?{}",?result);
????????if(?result?!=?1?)?{
????????????throw?new?RuntimeException("賬戶余額不足");
????????}
????????resultHolder.add(?xid?);
????????return?result;
????}
????@Override
????public?void?confirmPay(BusinessActionContext?context)?{
????????//?獲取全局事務(wù)ID
????????String?xid?=?context.getXid();
????????//?冪等設(shè)計(jì):?防止重復(fù)提交
????????if(?!resultHolder.contains(xid)?)?{
????????????return;
????????}
????????PayDto?payDto?=?getParamByContext(context);
????????payMapper.confirmPay(?payDto?);
????????resultHolder.remove(xid);
??????????log.info("[Pay?Service]:?confirm?pay");
????}
????@Override
????public?void?cancelPay(BusinessActionContext?context)?{
????????//?獲取全局事務(wù)ID
????????String?xid?=?context.getXid();
????????//?1.?冪等設(shè)計(jì):?防止重復(fù)回滾;?2.?實(shí)現(xiàn)空回滾
????????if(?!resultHolder.contains(xid)?)?{
????????????return;
????????}
????????PayDto?payDto?=?getParamByContext(context);
????????payMapper.cancelPay(?payDto?);
????????resultHolder.remove(xid);
????????log.info("[Pay?Service]:?cancel?pay");
????}
}
余額表對應(yīng)的實(shí)體類Pay如下所示
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@TableName("member_balance")
public?class?Pay?{
????@TableId
????private?int?id;
????/**
?????*?姓名
?????*/
????private?String?name;
????/**
?????*?余額
?????*/
????private?int?balance;
????/**
?????*?凍結(jié)金額
?????*/
????private?int?freeze;
}
而支付的各階段方法所使用的SQL如下所示,其同樣通過中間狀態(tài)freeze凍結(jié)金額這一中間狀態(tài)實(shí)現(xiàn)TCC的兩階段
<update?id="pay"?parameterType="com.aaron.PayService.dto.PayDto">
????update?member_balance
????set?balance?=?balance?-?#{money},?freeze?=?freeze?+?#{money}
????where?name?=?#{name}
????and?(balance?-?#{money})?>=?0
update>
<update?id="confirmPay"?parameterType="com.aaron.PayService.dto.PayDto">
????update?member_balance
????set?freeze?=?freeze?-?#{money}
????where?name?=?#{name}
update>
<update?id="cancelPay"?parameterType="com.aaron.PayService.dto.PayDto">
????update?member_balance
????set?balance?=?balance?+?#{money},?freeze?=?freeze?-?#{money}
????where?name?=?#{name}
update>
測試
在測試之前,需要對PayService支付服務(wù)、StorageService庫存服務(wù)各自的數(shù)據(jù)庫完成表的建立及數(shù)據(jù)初始化工作,如下圖所示

分別在90、8080端口啟動PayService支付服務(wù)、StorageService庫存服務(wù),當(dāng)我們第一次調(diào)用PayService支付服務(wù)的buy接口時(shí),可以看到余額、庫存均被正??蹨p

而當(dāng)?shù)诙握{(diào)用該接口時(shí),由于商品庫存不足。則會導(dǎo)致整個(gè)分布式事務(wù)進(jìn)行回滾??梢钥吹接囝~、庫存的數(shù)據(jù)由于被正常回滾,故未發(fā)生意外扣除

