Golang 從零到一開發(fā)實現(xiàn) RPC 框架(二)集群實現(xiàn)
內(nèi)容提要
在上一篇文章中分享了如何從零開始搭建一個 RPC 框架,并完成了 P2P 版本功能,本章繼續(xù)完善增加服務(wù)注冊發(fā)現(xiàn)和負載均衡實現(xiàn)集群能力。
傳送門:RPC框架(一)
本文主要內(nèi)容包括:
RPC 接入服務(wù)注冊中心 服務(wù)端實現(xiàn)平滑啟停
客戶端實現(xiàn)服務(wù)發(fā)現(xiàn)
客戶端實現(xiàn)負載均衡
客戶端實現(xiàn)失敗策略
服務(wù)注冊發(fā)現(xiàn)
在 P2P 版本 RPC 中,客戶端要知道服務(wù)端的地址,并發(fā)起點對點連接,雖然滿足了服務(wù)調(diào)用的能力,但其弊端也顯而易見。為了保障服務(wù)高可用,通常會冗余部署多個服務(wù)端實例,而客戶端如何知道每一個服務(wù)實例的調(diào)用地址,服務(wù)端實例上下線又如何告知客戶端,這就需要引入服務(wù)自動注冊發(fā)現(xiàn)的能力。


注冊發(fā)現(xiàn)是指客戶端具備動態(tài)發(fā)現(xiàn)服務(wù)端實例的能力,一般借助服務(wù)注冊中心來實現(xiàn),開源注冊中心有“Eurake”或“Nacos”等,本人之前專門有文章講過其實現(xiàn),對應(yīng)項目為 “service_discovery”,這里將以它為服務(wù)注冊中心,完成客戶端接入。
具體參閱:
首先定義客戶端接口,既要滿足服務(wù)提供者注冊/下線的能力,又要滿足服務(wù)消費者發(fā)現(xiàn)/觀察的能力。
type Registry interface {
Register(context.Context, *Instance) (context.CancelFunc, error)
Fetch(context.Context, string) ([]*Instance, bool)
Close() error
}
naming/naming.go
type Discovery struct {
once *sync.Once
conf *Config
ctx context.Context
cancelFunc context.CancelFunc
//local cache
mutex sync.RWMutex
apps map[string]*FetchData
registry map[string]struct{}
//registry center node
idx uint64 //node index
node atomic.Value //node list
}
func New(conf *Config) *Discovery {
if len(conf.Nodes) == 0 {
panic("conf nodes empty!")
}
ctx, cancel := context.WithCancel(context.Background())
dis := &Discovery{
ctx: ctx,
cancelFunc: cancel,
conf: conf,
apps: map[string]*FetchData{},
registry: map[string]struct{}{},
}
//from conf get node list
dis.node.Store(conf.Nodes)
go dis.updateNode()
return dis
}
naming/discovery.go
func (dis *Discovery) updateNode() {
ticker := time.NewTicker(NodeInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
uri := fmt.Sprintf(_nodesURL, dis.pickNode())
log.Println("discovery - request and update node, url:" + uri)
params := make(map[string]interface{})
params["env"] = dis.conf.Env
resp, err := HttpPost(uri, params)
if err != nil {
log.Println(err)
continue
}
res := ResponseFetch{}
err = json.Unmarshal([]byte(resp), &res)
if err != nil {
log.Println(err)
continue
}
newNodes := []string{}
for _, ins := range res.Data.Instances {
for _, addr := range ins.Addrs {
newNodes = append(newNodes, strings.TrimPrefix(addr, "http://"))
}
}
if len(newNodes) == 0 {
continue
}
curNodes := dis.node.Load().([]string)
if !compareNodes(curNodes, newNodes) {
dis.node.Store(newNodes)
log.Println("nodes list changed!", newNodes)
log.Println(newNodes)
} else {
log.Println("nodes list not change:", curNodes)
}
}
}
}
naming/discovery.go
//對比兩個數(shù)據(jù)是否完全相等
func compareNodes(a, b []string) bool {
if len(a) != len(b) {
return false
}
mapB := make(map[string]struct{}, len(b))
for _, node := range b {
mapB[node] = struct{}{}
}
for _, node := range a {
if _, ok := mapB[node]; !ok {
return false
}
}
return true
}
naming/discovery.go
實現(xiàn)服務(wù)注冊能力,先檢測本地緩存查看是否已注冊,沒有則請求注冊中心并發(fā)起注冊,異步維護一個定時任務(wù)來維持心跳(續(xù)約),如果發(fā)生終止則會調(diào)用取消接口從注冊中心注銷。
func (dis *Discovery) Register(ctx context.Context, instance *Instance) (context.CancelFunc, error)
{
var err error
//check local cache
dis.mutex.Lock()
if _, ok := dis.registry[instance.AppId]; ok {
err = errors.New("instance duplicate register")
} else {
dis.registry[instance.AppId] = struct{}{} //register local cache
}
dis.mutex.Unlock()
if err != nil {
return nil, err
}
//http register
ctx, cancel := context.WithCancel(dis.ctx)
if err = dis.register(instance); err != nil {
//fail
dis.mutex.Lock()
delete(dis.registry, instance.AppId)
dis.mutex.Unlock()
return cancel, err
}
ch := make(chan struct{}, 1)
cancelFunc := context.CancelFunc(func() {
cancel()
<-ch
})
//renew&cancel
go func() {
ticker := time.NewTicker(RenewInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := dis.renew(instance); err != nil {
dis.register(instance)
}
case <-ctx.Done():
dis.cancel(instance)
ch <- struct{}{}
}
}
}()
return cancelFunc, nil
}
naming/discovery.go
func (dis *Discovery) Fetch(ctx context.Context, appId string) ([]*Instance, bool) {
//from local
dis.mutex.RLock()
fetchData, ok := dis.apps[appId]
dis.mutex.RUnlock()
if ok {
log.Println("get data from local memory, appid:" + appId)
return fetchData.Instances, ok
}
//from remote
uri := fmt.Sprintf(_fetchURL, dis.pickNode())
params := make(map[string]interface{})
params["env"] = dis.conf.Env
params["appid"] = appId
params["status"] = 1 //up
resp, err := HttpPost(uri, params)
if err != nil {
dis.switchNode()
return nil, false
}
res := ResponseFetch{}
err = json.Unmarshal([]byte(resp), &res)
if res.Code != 200 {
return nil, false
}
if err != nil {
log.Println(err)
return nil, false
}
var result []*Instance
for _, ins := range res.Data.Instances {
result = append(result, ins)
}
if len(result) > 0 {
ok = true
dis.mutex.Lock()
dis.apps[appId] = &res.Data
dis.mutex.Unlock()
}
return result, ok
}
naming/discovery.go
服務(wù)端改造
服務(wù)端與注冊中心的交互包括服務(wù)啟動時會將自身服務(wù)信息(監(jiān)聽地址和端口)寫入注冊中心,開啟定時續(xù)約,在服務(wù)關(guān)閉退出時會注銷自身的注冊信息。

服務(wù)啟動注冊
type RPCServer struct {
listener Listener
++ registry naming.Registry
}
func NewRPCServer(option Option, registry naming.Registry) *RPCServer {
return &RPCServer{
listener: NewRPCListener(option),
++ registry: registry,
option: option,
}
}
provider/server.go
func main() {
//服務(wù)注冊中心
conf := &naming.Config{Nodes: config.RegistryAddrs, Env: config.Env}
discovery := naming.New(conf)
//注入依賴
srv := provider.NewRPCServer(option, discovery)
}
demo/server/server.go

func (svr *RPCServer) Run() {
//先啟動后暴露服務(wù)
err := svr.listener.Run()
if err != nil {
panic(err)
}
//register in discovery,注冊失?。ㄖ卦囀。┩顺龇?wù)
err = svr.registerToNaming()
if err != nil {
svr.Close() //注冊失敗關(guān)閉服務(wù)
panic(err)
}
}
func (svr *RPCServer) registerToNaming() error {
instance := &naming.Instance{
Env: svr.option.Env,
AppId: svr.option.AppId,
Hostname: svr.option.Hostname,
Addrs: svr.listener.GetAddrs(),
}
retries := maxRegisterRetry
for retries > 0 {
retries--
cancel, err := svr.registry.Register(context.Background(), instance)
if err == nil {
svr.cancelFunc = cancel
return nil
}
}
return errors.New("register to naming server fail")
}
provider/server.go
做個測試,先啟動服務(wù)注冊中心(service_discovery),再運行 demo/server,通過配置不同端口和hostname,啟動兩個服務(wù),從服務(wù)注冊中心可以看到其結(jié)果。

服務(wù)退出注銷
服務(wù)端從注冊中心注銷后,客戶端從注冊中心感知服務(wù)下線,就不再發(fā)送新連接和請求到該服務(wù)端實例。
這里也可能有些問題,由于客戶端緩存機制導(dǎo)致客戶端感知服務(wù)端變化滯后,仍會有少許時間新連接和請求提交到當(dāng)前服務(wù)端。目前由于還未使用長鏈接管理,無法知曉有哪些客戶端連接。如果此時服務(wù)仍存活就正常處理返回,如果失敗可以返回“特殊失敗碼“,告知客戶端不要再請求了,服務(wù)端關(guān)閉了。

func (svr *RPCServer) Close() {
//從服務(wù)注冊中心注銷
if svr.cancelFunc != nil {
svr.cancelFunc()
}
//關(guān)閉當(dāng)前服務(wù)
if svr.listener != nil {
svr.listener.Close()
}
}
func (svr *RPCServer) registerToNaming() error {
++ cancel, err := svr.registry.Register(context.Background(), instance)
++ svr.cancelFunc = cancel
}
//注冊中心注冊 (naming/discovery.go)
func (dis *Discovery) Register(ctx context.Context, instance *Instance) (context.CancelFunc, error)
{
ctx, cancel := context.WithCancel(dis.ctx)
ch := make(chan struct{}, 1)
cancelFunc := context.CancelFunc(func() {
cancel()
<-ch
})
for {
select {
case <-ctx.Done():
dis.cancel(instance) //服務(wù)注銷
ch <- struct{}{}
}
}
return cancelFunc, nil
}
服務(wù)關(guān)閉時,除了不再接受新請求外,還需要考慮處理中的請求,不能因為服務(wù)關(guān)閉而強制中斷所有處理中的請求。根據(jù)請求所處階段不同,可以分別設(shè)置“擋板”,告知服務(wù)調(diào)用方當(dāng)前服務(wù)處于關(guān)閉流程,不再接受請求了。
func main() {
//...
quit := make(chan os.Signal)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGQUIT)
<-quit
srv.Shutdown()
}
demo/server/server.go
func (svr *RPCServer) Shutdown() {
//從服務(wù)注冊中心注銷
if svr.cancelFunc != nil {
svr.cancelFunc()
}
//關(guān)閉當(dāng)前服務(wù)
if svr.listener != nil {
svr.listener.Shutdown()
}
}
provider/server.go
(1)首先是服務(wù)端接收到客戶端連接階段。如果此時發(fā)現(xiàn)服務(wù)關(guān)閉,設(shè)置擋板不再往下執(zhí)行,直接返回。
func (l *RPCListener) Run() error {
//... listen ...
++ go l.acceptConn() //accept conn
}
func (l *RPCListener) acceptConn() {
for {
conn, err := l.nl.Accept()
if err != nil {
select {
case <-l.getDoneChan(): //擋板:server closed done
return
default:
}
return
}
go l.handleConn(conn) //處理連接
}
}
type RPCListener struct {
++ doneChan chan struct{} //控制結(jié)束
}
func (l *RPCListener) getDoneChan() <-chan struct{} {
return l.doneChan
}
//關(guān)閉時關(guān)閉通道
func (l *RPCListener) Shutdown() {
l.closeDoneChan()
}
//關(guān)閉通道
func (l *RPCListener) closeDoneChan() {
select {
case <-l.doneChan:
default:
close(l.doneChan)
}
}
provider/listener.go
func (l *RPCListener) handleConn(conn net.Conn) {
//關(guān)閉擋板
++ if l.isShutdown() {
++ return
++ }
for {
++ if l.isShutdown() {
++ return
++ }
//handle ...
}
}
type RPCListener struct {
++ shutdown int32 //關(guān)閉處理中標(biāo)識位
}
//判斷是否關(guān)閉
func (l *RPCListener) isShutdown() bool {
return atomic.LoadInt32(&l.shutdown) == 1
}
//關(guān)閉邏輯
func (l *RPCListener) Shutdown() {
atomic.CompareAndSwapInt32(&l.shutdown, 0, 1)
}
provider/listener.go
(3)最后請求已進入服務(wù)實際處理階段。此時無法簡單設(shè)置擋板了,因為已經(jīng)是處理中,就應(yīng)該將請求處理完成。但我們需要確認有多少處理中的請求,并且確保這些請求全部執(zhí)行完成,然后就可以安全退出了。這有點像 WaitGroup 計數(shù)器,我們也維護一個處理中任務(wù)計數(shù)來達到目的。
type RPCListener struct {
++ handlingNum int32 //處理中任務(wù)數(shù)
}
func (l *RPCListener) handleConn(conn net.Conn) {
//...
//處理中任務(wù)數(shù)+1
++ atomic.AddInt32(&l.handlingNum, 1)
//任意退出都會導(dǎo)致處理中任務(wù)數(shù)-1
++ defer atomic.AddInt32(&l.handlingNum, -1)
//read from network
//decode
//call local func
//encode
//send result
}
func (l *RPCListener) Shutdown() {
atomic.CompareAndSwapInt32(&l.shutdown, 0, 1)
++ for {
++ if atomic.LoadInt32(&l.handlingNum) == 0 {
++ break
++ }
++ }
l.closeDoneChan()
}
provider/listener.go

客戶端改造
實現(xiàn)服務(wù)發(fā)現(xiàn)

客戶端通過 client_proxy 接入服務(wù)發(fā)現(xiàn),首先要在初始化時增加服務(wù)端的標(biāo)識(appId),通過服務(wù)注冊中心獲取該標(biāo)識對應(yīng)的實例列表。
func NewClientProxy(appId string, option Option, registry naming.Registry) ClientProxy {
cp := &RPCClientProxy{
option: option,
failMode: option.FailMode,
registry: registry,
}
servers, err := cp.discoveryService(context.Background(), appId)
if err != nil {
log.Fatal(err)
}
cp.servers = servers
cp.loadBalance = LoadBalanceFactory(option.LoadBalanceMode, cp.servers)
return cp
}
//獲取服務(wù)列表
func (cp *RPCClientProxy) discoveryService(ctx context.Context, appId string) ([]string, error) {
instances, ok := cp.registry.Fetch(ctx, appId)
if !ok {
return nil, errors.New("service not found")
}
var servers []string
for _, instance := range instances {
servers = append(servers, instance.Addrs...)
}
return servers, nil
}
consumer/client_proxy.go
實現(xiàn)負載均衡

type LoadBalanceMode int
const (
RandomBalance LoadBalanceMode = iota
RoundRobinBalance
WeightRoundRobinBalance
)
type LoadBalance interface {
Get() string
}
func LoadBalanceFactory(mode LoadBalanceMode, servers []string) LoadBalance {
switch mode {
case RandomBalance:
return newRandomBalance(servers)
case RoundRobinBalance:
return newRoundRobinBalance(servers)
default:
return newRandomBalance(servers)
}
}
consumer/loadbalance.go
type randomBalance struct {
servers []string
}
func newRandomBalance(servers []string) LoadBalance {
return &randomBalance{servers: servers}
}
func (b *randomBalance) Get() string {
rand.Seed(time.Now().Unix())
return b.servers[rand.Intn(len(b.servers))]
}
consumer/loadbalance.go
type roundRobinBalance struct {
servers []string
curIdx int
}
func newRoundRobinBalance(servers []string) LoadBalance {
return &roundRobinBalance{servers: servers, curIdx: 0}
}
func (b *roundRobinBalance) Get() string {
lens := len(b.servers)
if b.curIdx >= lens {
b.curIdx = 0
}
server := b.servers[b.curIdx]
b.curIdx = (b.curIdx + 1) % lens
return server
}
consumer/loadbalance.go
func (cp *RPCClientProxy) getConn() error {
addr := strings.Replace(cp.loadBalance.Get(), cp.option.NetProtocol+"://", "", -1)
err := cp.client.Connect(addr) //長連接管理
if err != nil {
return err
}
return nil
}
consumer/client_proxy.go
實現(xiàn)失敗策略
執(zhí)行調(diào)用階段還要考慮失敗策略,即在調(diào)用服務(wù)端過程中出錯后如何處理?這里出錯通常是網(wǎng)絡(luò)原因或是服務(wù)端程序異常產(chǎn)生,而非業(yè)務(wù)錯誤。
處理辦法可分為接受失敗或發(fā)起重試,接受失敗對應(yīng)策略就是 Failfast (快速失?。6卦嚳梢岳^續(xù)對上一次服務(wù)端地址發(fā)起調(diào)用 Failtry,它可以解決臨時性網(wǎng)絡(luò)失敗,但如果該實例服務(wù)端掛了再重試幾次也無濟于事,所以有另一個種策略 Failover,也就是故障轉(zhuǎn)移,換個服務(wù)端實例再試。

type FailMode int
const (
Failover FailMode = iota
Failfast
Failretry
)
consumer/fail.go
func (cp *RPCClientProxy) Call(ctx context.Context, servicePath string, stub interface{}, params ..
.interface{}) (interface{}, error) {
service, err := NewService(servicePath)
if err != nil {
return nil, err
}
err := cp.getConn()
if err != nil && cp.failMode == Failfast { //快速失敗
return nil, err
}
//失敗策略
switch cp.failMode {
case Failretry:
//...
case Failover:
//...
case Failfast:
//...
}
return nil, errors.New("call error")
}
consumer/client_proxy.go
switch cp.failMode {
case Failretry:
retries := cp.option.Retries
for retries > 0 {
retries--
if client != nil {
rs, err := cp.client.Invoke(ctx, service, stub, params...)
if err == nil {
return rs, nil
}
}
}
case Failover:
retries := cp.option.Retries
for retries > 0 {
retries--
if client != nil {
rs, err := cp.client.Invoke(ctx, service, stub, params...)
if err == nil {
return rs, nil
}
}
err = cp.getConn()
}
case Failfast:
if client != nil {
rs, err := cp.client.Invoke(ctx, service, stub, params...)
if err == nil {
return rs, nil
}
return nil, err
}
consumer/client_proxy.go


總結(jié)與補充
這一版 RPC 框架具備了集群能力、負載均衡和簡單容錯能力,當(dāng)然離一個完善的微服務(wù)框架仍有不少距離,所以后續(xù)會陸續(xù)迭代,希望大家多多支持。
