raft-java分布式一致性算法 Raft 的 Java 實現(xiàn)
raft-java
Raft implementation library for Java.
參考自Raft論文和Raft作者的開源實現(xiàn)LogCabin。
支持的功能
leader選舉
日志復(fù)制
snapshot
集群成員動態(tài)更變
Quick Start
在本地單機上部署一套3實例的raft集群,執(zhí)行如下腳本:
cd raft-java-example && sh deploy.sh
該腳本會在raft-java-example/env目錄部署三個實例example1、example2、example3;
同時會創(chuàng)建一個client目錄,用于測試raft集群讀寫功能。
部署成功后,測試寫操作,通過如下腳本: cd env/client
./bin/run_client.sh "127.0.0.1:8051,127.0.0.1:8052,127.0.0.1:8053" hello world
測試讀操作命令:
./bin/run_client.sh "127.0.0.1:8051,127.0.0.1:8052,127.0.0.1:8053" hello
使用方法
下面介紹如何在代碼中使用raft-java依賴庫來實現(xiàn)一套分布式存儲系統(tǒng)。
配置依賴
<dependency> <groupId>com.github.wenweihu86.raft</groupId> <artifactId>raft-java-core</artifactId> <version>1.8.0</version> </dependency>
定義數(shù)據(jù)寫入和讀取接口
message SetRequest {
string key = 1;
string value = 2;
}
message SetResponse {
bool success = 1;
}
message GetRequest {
string key = 1;
}
message GetResponse {
string value = 1;
}
public interface ExampleService {
Example.SetResponse set(Example.SetRequest request);
Example.GetResponse get(Example.GetRequest request);
}
服務(wù)端使用方法
實現(xiàn)狀態(tài)機StateMachine接口實現(xiàn)類
// 該接口三個方法主要是給Raft內(nèi)部調(diào)用
public interface StateMachine {
/**
* 對狀態(tài)機中數(shù)據(jù)進行snapshot,每個節(jié)點本地定時調(diào)用
* @param snapshotDir snapshot數(shù)據(jù)輸出目錄
*/
void writeSnapshot(String snapshotDir);
/**
* 讀取snapshot到狀態(tài)機,節(jié)點啟動時調(diào)用
* @param snapshotDir snapshot數(shù)據(jù)目錄
*/
void readSnapshot(String snapshotDir);
/**
* 將數(shù)據(jù)應(yīng)用到狀態(tài)機
* @param dataBytes 數(shù)據(jù)二進制
*/
void apply(byte[] dataBytes);
}
實現(xiàn)數(shù)據(jù)寫入和讀取接口
// ExampleService實現(xiàn)類中需要包含以下成員 private RaftNode raftNode; private ExampleStateMachine stateMachine;
// 數(shù)據(jù)寫入主要邏輯 byte[] data = request.toByteArray(); // 數(shù)據(jù)同步寫入raft集群 boolean success = raftNode.replicate(data, Raft.EntryType.ENTRY_TYPE_DATA); Example.SetResponse response = Example.SetResponse.newBuilder().setSuccess(success).build();
// 數(shù)據(jù)讀取主要邏輯,由具體應(yīng)用狀態(tài)機實現(xiàn) Example.GetResponse response = stateMachine.get(request);
服務(wù)端啟動邏輯
// 初始化RPCServer RPCServer server = new RPCServer(localServer.getEndPoint().getPort()); // 應(yīng)用狀態(tài)機 ExampleStateMachine stateMachine = new ExampleStateMachine(); // 設(shè)置Raft選項,比如: RaftOptions.snapshotMinLogSize = 10 * 1024; RaftOptions.snapshotPeriodSeconds = 30; RaftOptions.maxSegmentFileSize = 1024 * 1024; // 初始化RaftNode RaftNode raftNode = new RaftNode(serverList, localServer, stateMachine); // 注冊Raft節(jié)點之間相互調(diào)用的服務(wù) RaftConsensusService raftConsensusService = new RaftConsensusServiceImpl(raftNode); server.registerService(raftConsensusService); // 注冊給Client調(diào)用的Raft服務(wù) RaftClientService raftClientService = new RaftClientServiceImpl(raftNode); server.registerService(raftClientService); // 注冊應(yīng)用自己提供的服務(wù) ExampleService exampleService = new ExampleServiceImpl(raftNode, stateMachine); server.registerService(exampleService); // 啟動RPCServer,初始化Raft節(jié)點 server.start(); raftNode.init();
評論
圖片
表情
