CAP-分布式事務(wù)的解決方案

CAP 是一個基于 .NET Standard 的 C# 庫,它是一種處理分布式事務(wù)的解決方案,同樣具有 EventBus 的功能,它具有輕量級、易使用、高性能等特點。
https://github.com/dotnetcore/CAP
在我們構(gòu)建 SOA 或者 微服務(wù)系統(tǒng)的過程中,我們通常需要使用事件來對各個服務(wù)進行集成,在這過程中簡單的使用消息隊列并不能保證數(shù)據(jù)的最終一致性, CAP 采用的是和當(dāng)前數(shù)據(jù)庫集成的本地消息表的方案來解決在分布式系統(tǒng)互相調(diào)用的各個環(huán)節(jié)可能出現(xiàn)的異常,它能夠保證任何情況下事件消息都是不會丟失的。
你同樣可以把 CAP 當(dāng)做 EventBus 來使用,CAP提供了一種更加簡單的方式來實現(xiàn)事件消息的發(fā)布和訂閱,在訂閱以及發(fā)布的過程中,你不需要繼承或?qū)崿F(xiàn)任何接口。
這是CAP集在ASP.NET Core 微服務(wù)架構(gòu)中的一個示意圖:
架構(gòu)預(yù)覽

CAP 實現(xiàn)了 eShop 電子書 中描述的發(fā)件箱模式
Getting Started
NuGet
你可以運行以下下命令在你的項目中安裝 CAP。
PM> Install-Package DotNetCore.CAP
CAP 支持 Kafka、RabbitMQ、AzureServiceBus 等消息隊列,你可以按需選擇下面的包進行安裝:
PM> Install-Package DotNetCore.CAP.Kafka
PM> Install-Package DotNetCore.CAP.RabbitMQ
PM> Install-Package DotNetCore.CAP.AzureServiceBus
CAP 提供了 Sql Server, MySql, PostgreSQL,MongoDB 的擴展作為數(shù)據(jù)庫存儲:
// 按需選擇安裝你正在使用的數(shù)據(jù)庫
PM> Install-Package DotNetCore.CAP.SqlServer
PM> Install-Package DotNetCore.CAP.MySql
PM> Install-Package DotNetCore.CAP.PostgreSql
PM> Install-Package DotNetCore.CAP.MongoDB
Configuration
首先配置CAP到 Startup.cs 文件中,如下:
public void ConfigureServices(IServiceCollection services)
{
......
services.AddDbContext<AppDbContext>();
services.AddCap(x =>
{
//如果你使用的 EF 進行數(shù)據(jù)操作,你需要添加如下配置:
x.UseEntityFramework<AppDbContext>(); //可選項,你不需要再次配置 x.UseSqlServer 了
//如果你使用的ADO.NET,根據(jù)數(shù)據(jù)庫選擇進行配置:
x.UseSqlServer("數(shù)據(jù)庫連接字符串");
x.UseMySql("數(shù)據(jù)庫連接字符串");
x.UsePostgreSql("數(shù)據(jù)庫連接字符串");
//如果你使用的 MongoDB,你可以添加如下配置:
x.UseMongoDB("ConnectionStrings"); //注意,僅支持MongoDB 4.0+集群
//CAP支持 RabbitMQ、Kafka、AzureServiceBus 等作為MQ,根據(jù)使用選擇配置:
x.UseRabbitMQ("ConnectionStrings");
x.UseKafka("ConnectionStrings");
x.UseAzureServiceBus("ConnectionStrings");
});
}
發(fā)布
在 Controller 中注入 ICapPublisher 然后使用 ICapPublisher 進行消息發(fā)送
public class PublishController : Controller
{
private readonly ICapPublisher _capBus;
public PublishController(ICapPublisher capPublisher)
{
_capBus = capPublisher;
}
//不使用事務(wù)
[Route("~/without/transaction")]
public IActionResult WithoutTransaction()
{
_capBus.Publish("xxx.services.show.time", DateTime.Now);
return Ok();
}
//Ado.Net 中使用事務(wù),自動提交
[Route("~/adonet/transaction")]
public IActionResult AdonetWithTransaction()
{
using (var connection = new MySqlConnection(ConnectionString))
{
using (var transaction = connection.BeginTransaction(_capBus, autoCommit: true))
{
//業(yè)務(wù)代碼
_capBus.Publish("xxx.services.show.time", DateTime.Now);
}
}
return Ok();
}
//EntityFramework 中使用事務(wù),自動提交
[Route("~/ef/transaction")]
public IActionResult EntityFrameworkWithTransaction([FromServices]AppDbContext dbContext)
{
using (var trans = dbContext.Database.BeginTransaction(_capBus, autoCommit: true))
{
//業(yè)務(wù)代碼
_capBus.Publish("xxx.services.show.time", DateTime.Now);
}
return Ok();
}
}
訂閱
Action Method
在 Action 上添加 CapSubscribeAttribute 來訂閱相關(guān)消息。
public class PublishController : Controller
{
[CapSubscribe("xxx.services.show.time")]
public void CheckReceivedMessage(DateTime datetime)
{
Console.WriteLine(datetime);
}
}
Service Method
如果你的訂閱方法沒有位于 Controller 中,則你訂閱的類需要繼承 ICapSubscribe:
namespace xxx.Service
{
public interface ISubscriberService
{
void CheckReceivedMessage(DateTime datetime);
}
public class SubscriberService: ISubscriberService, ICapSubscribe
{
[CapSubscribe("xxx.services.show.time")]
public void CheckReceivedMessage(DateTime datetime)
{
}
}
}
然后在 Startup.cs 中的 ConfigureServices() 中注入你的 ISubscriberService 類
public void ConfigureServices(IServiceCollection services)
{
//注意: 注入的服務(wù)需要在 `services.AddCap()` 之前
services.AddTransient<ISubscriberService,SubscriberService>();
services.AddCap(x=>{});
}
訂閱者組
訂閱者組的概念類似于 Kafka 中的消費者組,它和消息隊列中的廣播模式相同,用來處理不同微服務(wù)實例之間同時消費相同的消息。
當(dāng)CAP啟動的時候,她將創(chuàng)建一個默認的消費者組,如果多個相同消費者組的消費者消費同一個Topic消息的時候,只會有一個消費者被執(zhí)行。相反,如果消費者都位于不同的消費者組,則所有的消費者都會被執(zhí)行。
相同的實例中,你可以通過下面的方式來指定他們位于不同的消費者組。
[CapSubscribe("xxx.services.show.time", Group = "group1" )]
public void ShowTime1(DateTime datetime)
{
}
[CapSubscribe("xxx.services.show.time", Group = "group2")]
public void ShowTime2(DateTime datetime)
{
}
ShowTime1 和 ShowTime2 處于不同的組,他們將會被同時調(diào)用。
PS,你可以通過下面的方式來指定默認的消費者組名稱:
services.AddCap(x =>
{
x.DefaultGroup = "default-group-name";
});
Dashboard
CAP 2.1+ 以上版本中提供了儀表盤(Dashboard)功能,你可以很方便的查看發(fā)出和接收到的消息。除此之外,你還可以在儀表盤中實時查看發(fā)送或者接收到的消息。
使用一下命令安裝 Dashboard:
PM> Install-Package DotNetCore.CAP.Dashboard
在分布式環(huán)境中,儀表盤內(nèi)置集成了 Consul 作為節(jié)點的注冊發(fā)現(xiàn),同時實現(xiàn)了網(wǎng)關(guān)代理功能,你同樣可以方便的查看本節(jié)點或者其他節(jié)點的數(shù)據(jù),它就像你訪問本地資源一樣。
services.AddCap(x =>
{
//...
// 注冊 Dashboard
x.UseDashboard();
// 注冊節(jié)點到 Consul
x.UseDiscovery(d =>
{
d.DiscoveryServerHostName = "localhost";
d.DiscoveryServerPort = 8500;
d.CurrentNodeHostName = "localhost";
d.CurrentNodePort = 5800;
d.NodeId = 1;
d.NodeName = "CAP No.1 Node";
});
});
儀表盤默認的訪問地址是:http://localhost:xxx/cap,你可以在d.MatchPath配置項中修改cap路徑后綴為其他的名字。

【推薦】.NET Core開發(fā)實戰(zhàn)視頻課程 ★★★
.NET Core實戰(zhàn)項目之CMS 第一章 入門篇-開篇及總體規(guī)劃
【.NET Core微服務(wù)實戰(zhàn)-統(tǒng)一身份認證】開篇及目錄索引
Redis基本使用及百億數(shù)據(jù)量中的使用技巧分享(附視頻地址及觀看指南)
.NET Core中的一個接口多種實現(xiàn)的依賴注入與動態(tài)選擇看這篇就夠了
用abp vNext快速開發(fā)Quartz.NET定時任務(wù)管理界面
在ASP.NET Core中創(chuàng)建基于Quartz.NET托管服務(wù)輕松實現(xiàn)作業(yè)調(diào)度
現(xiàn)身說法:實際業(yè)務(wù)出發(fā)分析百億數(shù)據(jù)量下的多表查詢優(yōu)化
