Dubbo中重要的protocol【協(xié)議層】概述
protocol【協(xié)議層】概述
協(xié)議通常是涉及到多方相互協(xié)作為實(shí)現(xiàn)某功能而遵守的規(guī)則
常見(jiàn)通訊協(xié)議 如:網(wǎng)絡(luò)七層協(xié)議【OSI】、HTTP協(xié)議、TCP協(xié)議、UDP協(xié)議
網(wǎng)絡(luò)七層協(xié)議【OSI】:應(yīng)用層、表示層、會(huì)話層、傳輸層、網(wǎng)絡(luò)層、數(shù)據(jù)鏈路層、物理層;規(guī)定了網(wǎng)絡(luò)通訊頂層大框架,只有接口【很抽象】
HTTP協(xié)議:大概會(huì)想到 應(yīng)用層協(xié)議、URL格式、httpheader、httpbody、get|post...【較具象】
TCP協(xié)議:大概會(huì)想到 傳輸層協(xié)議、三次握手/四次揮手、可靠性協(xié)議...【較具象】
不論那種通訊協(xié)議都是指定了網(wǎng)絡(luò)通訊中某些【或全部】環(huán)節(jié)具體規(guī)則
dubbo框架有很多通訊協(xié)議可供選擇,不同通訊協(xié)議相當(dāng)于實(shí)現(xiàn)RPC功能的不同路徑
將RPC功能類比成:“從A城市到B城市” 這么一件事情
多種協(xié)議 可類比成 從A到B 有多種可選擇的交通工具
選擇一種具體交通工具 就決定了:買什么票、司機(jī)是誰(shuí)、舒適度、交通線路 ...
dubbo的任何一種協(xié)議也規(guī)定【或默認(rèn)】了 序列化方式、【長(zhǎng)|短】鏈接、底層協(xié)議【http|tcp】、同步或異步、消息處理線程模型 ...
dubbo中的Protocol
org.apache.dubbo.rpc.Protocol#export; provider 暴露invoker對(duì)象
核心功能1: 啟動(dòng)xxxServer,將服務(wù)執(zhí)行對(duì)象【invoker】暴露在網(wǎng)絡(luò)中
org.apache.dubbo.rpc.Protocol#refer ; consumer 將URL轉(zhuǎn)換成invoker對(duì)象
核心功能2: 創(chuàng)建xxxClient,封裝為調(diào)用對(duì)象【invoker】供consumer調(diào)用
Protocol 簡(jiǎn)圖

// 默認(rèn)rpc層協(xié)議采用:DubboProtocol
@SPI("dubbo")
public interface Protocol {
// provider 暴露服務(wù)階段調(diào)用
// invoker:rpc接口實(shí)現(xiàn)類[impl]轉(zhuǎn)成invoker對(duì)象
// 通過(guò)調(diào)用org.apache.dubbo.rpc.ProxyFactory#getInvoker進(jìn)行【impl與invoker】 轉(zhuǎn)換
// provider方法執(zhí)行鏈:【.. -> Invoker#invoker -> Wrapper#invokeMethod -> Impl#xxx】
//-------------------invoker 說(shuō)明 end------------------------
// 例:http協(xié)議暴露服務(wù):?jiǎn)?dòng)http服務(wù)器,將對(duì)應(yīng)接口實(shí)現(xiàn)類暴露成http 服務(wù)
// 服務(wù)暴露URL 例 : http://127.0.0.1:49178/org.apache.dubbo.rpc.protocol.http.HttpService?version=1.0.0
// 例:dubbo協(xié)議暴露服務(wù):默認(rèn)啟動(dòng)netty server進(jìn)行
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
// consumer 引用服務(wù)階段執(zhí)行
// 根據(jù)接口類型type,與遠(yuǎn)程服務(wù)url生成invoker對(duì)象
// consumer方法調(diào)用鏈:rpcInterface#xxx -> proxy#xxx -> invoker#invoker -> nettyClient#send【默認(rèn)】
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
void destroy();
。。。
}
AbstractProtocol 以及相關(guān)子類協(xié)議都在dubbo-rpc模塊下
DubboProtocol、RedisProtocol、MemcacheProtocol 直接繼承AbstractProtocol
其他常見(jiàn)dubbo-rpc包下協(xié)議需要繼承AbstractProxyProtocol
RegistryProtocol 服務(wù)注冊(cè)監(jiān)聽(tīng)階段功能處理【本文暫不分析】
ProtocolFilterWrapper, ProtocolListenerWrapper 協(xié)議包裝【代理】類
public abstract class AbstractProtocol implements Protocol {
// 服務(wù)暴露map對(duì)象
protected final Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<String, Exporter<?>>();
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// 異步轉(zhuǎn)同步invoker
return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
}
// 子類【服務(wù)引用】實(shí)現(xiàn)該方法
protected abstract <T> Invoker<T> protocolBindingRefer(Class<T> type, URL url) throws RpcException;
。。。
}
AbstractProtocol 中定義了服務(wù)暴露exporterMap屬性
定義新服務(wù)引用抽象方法 :protocolBindingRefer
dubbo-rpc模塊中協(xié)議分類
代理協(xié)議
AbstractProxyProtocol
核心功能:rpc接口代理類與invoker對(duì)象之間進(jìn)行轉(zhuǎn)換【閱讀完該類代碼后,發(fā)現(xiàn)類名稱很形象】
Protocol定義的export與refer方法對(duì)應(yīng)的參數(shù)與返回值 與 AbstractProxyProtocol子類對(duì)應(yīng) doExport, doRefer 參數(shù)與返回值類型不一樣;所以需要轉(zhuǎn)換
AbstractProxyProtocol 中的 export與refer,會(huì)調(diào)用子類的doExport 與 doRefer, 并進(jìn)行 proxy【接口代理類對(duì)象】與 invoker對(duì)象 之間轉(zhuǎn)換
那么AbstractProxyProtocol 子類 就不能直接接收或返回與Protocol export或refer相同類型的參數(shù)或返回值么?或者自己進(jìn)行轉(zhuǎn)換?答案:不能直接接收或返回相同的參數(shù)或返回值【下文可以看到對(duì)應(yīng)子類實(shí)現(xiàn)】;子類可以在內(nèi)部轉(zhuǎn)換,但每個(gè)子類都需要轉(zhuǎn)換,所以相同功能可以抽到父類處理
【DubboProtocol 服務(wù)暴露與引用方法中 比AbstractProxyProtocol 少了一次轉(zhuǎn)換】
public abstract class AbstractProxyProtocol extends AbstractProtocol {
private final List<Class<?>> rpcExceptions = new CopyOnWriteArrayList<Class<?>>();
// 代理工廠
protected ProxyFactory proxyFactory;
public AbstractProxyProtocol() {
}
public AbstractProxyProtocol(Class<?>... exceptions) {
for (Class<?> exception : exceptions) {
addRpcException(exception);
}
}
public ProxyFactory getProxyFactory() {
return proxyFactory;
}
public void setProxyFactory(ProxyFactory proxyFactory) {
this.proxyFactory = proxyFactory;
}
// 對(duì)應(yīng)子類實(shí)現(xiàn)該服務(wù)暴露方法時(shí) 需傳入rpc接口實(shí)現(xiàn)類impl【或?qū)崿F(xiàn)代理類】
// 而Protocol接口方法是 :<T> Exporter<T> export(Invoker<T> invoker);
// 所以需要該代理協(xié)議 實(shí)現(xiàn) invoker 與 代理實(shí)現(xiàn)類impl的轉(zhuǎn)換
protected abstract <T> Runnable doExport(T impl, Class<T> type, URL url) throws RpcException;
// 對(duì)應(yīng)子類實(shí)現(xiàn)該引用服務(wù)方法時(shí) 返回值類型是 :rpc接口代理類
// 而Protocol接口方法:<T> Invoker<T> refer(Class<T> type, URL url);
// 所以需要該代理協(xié)議 實(shí)現(xiàn) invoker 與 rpc接口代理類的轉(zhuǎn)換
protected abstract <T> T doRefer(Class<T> type, URL url) throws RpcException;
@Override
@SuppressWarnings("unchecked")
public <T> Exporter<T> export(final Invoker<T> invoker) throws RpcException {
final String uri = serviceKey(invoker.getUrl());
Exporter<T> exporter = (Exporter<T>) exporterMap.get(uri);
if (exporter != null) {
// When modifying the configuration through override, you need to re-expose the newly modified service.
if (Objects.equals(exporter.getInvoker().getUrl(), invoker.getUrl())) {
return exporter;
}
}
// 服務(wù)暴露階段將invoker對(duì)象轉(zhuǎn)換成接口代理類對(duì)象進(jìn)行暴露
// 【DubboProtocol沒(méi)有這一次轉(zhuǎn)換】
final Runnable runnable = doExport(proxyFactory.getProxy(invoker, true), invoker.getInterface(), invoker.getUrl());
exporter = new AbstractExporter<T>(invoker) {
@Override
public void unexport() {
super.unexport();
exporterMap.remove(uri);
if (runnable != null) {
try {
runnable.run();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}
};
exporterMap.put(uri, exporter);
return exporter;
}
@Override
protected <T> Invoker<T> protocolBindingRefer(final Class<T> type, final URL url) throws RpcException {
// 服務(wù)引用階段 多一次invoker 與 接口實(shí)現(xiàn)代理類的轉(zhuǎn)換
// 【DubboProtocol沒(méi)有這一次轉(zhuǎn)換】
final Invoker<T> target = proxyFactory.getInvoker(doRefer(type, url), type, url);
Invoker<T> invoker = new AbstractInvoker<T>(type, url) {
@Override
protected Result doInvoke(Invocation invocation) throws Throwable {
try {
Result result = target.invoke(invocation);
// FIXME result is an AsyncRpcResult instance.
Throwable e = result.getException();
if (e != null) {
for (Class<?> rpcException : rpcExceptions) {
if (rpcException.isAssignableFrom(e.getClass())) {
throw getRpcException(type, url, invocation, e);
}
}
}
return result;
} catch (RpcException e) {
if (e.getCode() == RpcException.UNKNOWN_EXCEPTION) {
e.setCode(getErrorCode(e.getCause()));
}
throw e;
} catch (Throwable e) {
throw getRpcException(type, url, invocation, e);
}
}
};
invokers.add(invoker);
return invoker;
}
。。。
}
功能分類
單鏈接、長(zhǎng)鏈接,NIO異步傳輸,TCP
DubboProtocol 【非代理協(xié)議】
服務(wù)暴露會(huì)調(diào)用Exchangers.bind(url, requestHandler);方法【指定requestHandler】
服務(wù)引用會(huì)調(diào)用Exchangers.connect(url, requestHandler);方法【指定requestHandler】
requestHandler:請(qǐng)求處理handler,同時(shí)也處理回調(diào)請(qǐng)求【服務(wù)端回調(diào)客戶端】
dubbo中其他rpc模塊下的協(xié)議類均沒(méi)看到回調(diào)處理【均沒(méi)有回調(diào)處理】
服務(wù)暴露與服務(wù)引用階段不需要?jiǎng)?chuàng)建rpc接口代理對(duì)象【與AbstractProxyProtocol子協(xié)議代碼結(jié)構(gòu)上的一個(gè)不同點(diǎn)】
Exchangers 內(nèi)部指定了【netty | mina | grizzly】java nio框架
服務(wù)暴露使用使用nettyServer【默認(rèn)】, 服務(wù)引用可以采用minaClient【配置可指定 netty | mina | grizzly 三者之間選擇】
默認(rèn)采用hessian2序列化方式,dubbo自定義數(shù)據(jù)包結(jié)構(gòu)
默認(rèn)服務(wù)引用方創(chuàng)建一個(gè)長(zhǎng)鏈接
public class DubboProtocol extends AbstractProtocol {
// 請(qǐng)求處理handler
// 回調(diào)請(qǐng)求也在該handler 處理
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
// 是否有回調(diào)CallBack 處理
// need to consider backward-compatibility if it's a callback
if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsStr = invoker.getUrl().getParameters().get("methods");
。。。
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
Result result = invoker.invoke(inv);
return result.thenApply(Function.identity());
}
。。。
};
// 直接將invoker對(duì)象進(jìn)行暴露
// 沒(méi)有生成接口實(shí)現(xiàn)代理對(duì)象【與AbstractProxyProtocol不同】
// 在openServer(url);方法中指定了requestHander進(jìn)行處理所有請(qǐng)求
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
}
}
// 啟動(dòng)server
openServer(url);
optimizeSerialization(url);
return exporter;
}
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
// serverMap 為AbstractProtocol中屬性,
// 可當(dāng)作服務(wù)server緩存
// 并可保證接口暴露冪等性
ProtocolServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
}
}
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
private ProtocolServer createServer(URL url) {
url = URLBuilder.from(url)
// send readonly event when server closes, it's enabled by default
.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
// enable heartbeat by default
.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
.addParameter(CODEC_KEY, DubboCodec.NAME)
.build();
String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
}
ExchangeServer server;
try {
// 指定 requestHandler 來(lái)接收請(qǐng)求消息處理器
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return new DubboProtocolServer(server);
}
// 繼承AbstractProtocol 服務(wù)引用方法
@Override
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// create rpc invoker.
// 創(chuàng)建client
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
private ExchangeClient[] getClients(URL url) {
boolean useShareConnect = false;
int connections = url.getParameter(CONNECTIONS_KEY, 0);
List<ReferenceCountExchangeClient> shareClients = null;
// if not configured, connection is shared, otherwise, one connection for one service
if (connections == 0) {
useShareConnect = true;
String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,
DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
// 通常引用一個(gè)服務(wù)只啟動(dòng)一個(gè)client【tcp長(zhǎng)鏈接】
shareClients = getSharedClient(url, connections);
}
// 通過(guò)connections 參數(shù)控制啟動(dòng)多個(gè)client【tcp長(zhǎng)鏈接】
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (useShareConnect) {
clients[i] = shareClients.get(i);
} else {
clients[i] = initClient(url);
}
}
return clients;
}
private ExchangeClient initClient(URL url) {
// client type setting.
String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));
url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
// enable heartbeat by default
url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));
// BIO is not allowed since it has severe performance issue.
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " + str + "," +
" supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
}
ExchangeClient client;
try {
// connection should be lazy
if (url.getParameter(LAZY_CONNECT_KEY, false)) {
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
// 創(chuàng)建client也需要指定requestHandler
// 此處的requestHandler 處理回調(diào)callback請(qǐng)求處理【provider回調(diào)consumer】
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}
return client;
}
。。。。。
}
多鏈接、短鏈接,同步傳輸,HTTP | TCP
HttpProtocol、HessionProtocol、RestProtocol、RmiProtocol、WebServiceProtocol、XmlRpcProtocol【代理協(xié)議子類】
每個(gè)協(xié)議類:創(chuàng)建server并啟動(dòng),創(chuàng)建client,指定請(qǐng)求處理器handler
server 與 client都采用開(kāi)源組件創(chuàng)建
各代理子類創(chuàng)建server或client對(duì)象 都依賴到接口實(shí)現(xiàn)類【或接口代理實(shí)現(xiàn)類】
Protocol接口中export與refer方法需要傳入或返回invoker對(duì)象
AbstractProxyProtocol 中export 與 refer 方法有實(shí)現(xiàn) 接口代理實(shí)現(xiàn)類對(duì)象 與 invoker互轉(zhuǎn)
采用通用的序列化方式【json 或 hessian】
RmiProtocol 采用TCP協(xié)議,其他協(xié)議都采用HTTP協(xié)議
dubbo 默認(rèn)采用jetty作為http web服務(wù)器
無(wú)回調(diào)callback處理
源碼示例
public class HttpProtocol extends AbstractProxyProtocol {
public static final String ACCESS_CONTROL_ALLOW_ORIGIN_HEADER = "Access-Control-Allow-Origin";
public static final String ACCESS_CONTROL_ALLOW_METHODS_HEADER = "Access-Control-Allow-Methods";
public static final String ACCESS_CONTROL_ALLOW_HEADERS_HEADER = "Access-Control-Allow-Headers";
private final Map<String, JsonRpcServer> skeletonMap = new ConcurrentHashMap<>();
private HttpBinder httpBinder;
public HttpProtocol() {
super(HttpException.class, JsonRpcClientException.class);
}
public void setHttpBinder(HttpBinder httpBinder) {
this.httpBinder = httpBinder;
}
@Override
public int getDefaultPort() {
return 80;
}
// 消息處理器
private class InternalHandler implements HttpHandler {
private boolean cors;
public InternalHandler(boolean cors) {
this.cors = cors;
}
@Override
public void handle(HttpServletRequest request, HttpServletResponse response)
throws ServletException {
String uri = request.getRequestURI();
JsonRpcServer skeleton = skeletonMap.get(uri);
if (cors) {
response.setHeader(ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*");
response.setHeader(ACCESS_CONTROL_ALLOW_METHODS_HEADER, "POST");
response.setHeader(ACCESS_CONTROL_ALLOW_HEADERS_HEADER, "*");
}
if (request.getMethod().equalsIgnoreCase("OPTIONS")) {
response.setStatus(200);
} else if (request.getMethod().equalsIgnoreCase("POST")) {
RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(), request.getRemotePort());
try {
// skeleton:com.googlecode.jsonrpc4j.JsonRpcServer
// 采用JSON序列化
skeleton.handle(request.getInputStream(), response.getOutputStream());
} catch (Throwable e) {
throw new ServletException(e);
}
} else {
response.setStatus(500);
}
}
}
@Override
protected <T> Runnable doExport(final T impl, Class<T> type, URL url) throws RpcException {
String addr = getAddr(url);
ProtocolServer protocolServer = serverMap.get(addr);
if (protocolServer == null) {
// 指定消息處理器handler
RemotingServer remotingServer = httpBinder.bind(url, new InternalHandler(url.getParameter("cors", false)));
serverMap.put(addr, new ProxyProtocolServer(remotingServer));
}
final String path = url.getAbsolutePath();
final String genericPath = path + "/" + GENERIC_KEY;
// 創(chuàng)建 server
JsonRpcServer skeleton = new JsonRpcServer(impl, type);
// 創(chuàng)建com.googlecode.jsonrpc4j.JsonRpcServer 需要傳入rpc接口實(shí)現(xiàn)【或代理類】對(duì)象
JsonRpcServer genericServer = new JsonRpcServer(impl, GenericService.class);
skeletonMap.put(path, skeleton);
skeletonMap.put(genericPath, genericServer);
return () -> {
skeletonMap.remove(path);
skeletonMap.remove(genericPath);
};
}
@SuppressWarnings("unchecked")
@Override
protected <T> T doRefer(final Class<T> serviceType, URL url) throws RpcException {
final String generic = url.getParameter(GENERIC_KEY);
final boolean isGeneric = ProtocolUtils.isGeneric(generic) || serviceType.equals(GenericService.class);
/********* com.googlecode.jsonrpc4j.spring.JsonProxyFactoryBean 創(chuàng)建代理對(duì)象過(guò)程【rpc-client】 **************/
JsonProxyFactoryBean jsonProxyFactoryBean = new JsonProxyFactoryBean();
JsonRpcProxyFactoryBean jsonRpcProxyFactoryBean = new JsonRpcProxyFactoryBean(jsonProxyFactoryBean);
jsonRpcProxyFactoryBean.setRemoteInvocationFactory((methodInvocation) -> {
RemoteInvocation invocation = new JsonRemoteInvocation(methodInvocation);
if (isGeneric) {
invocation.addAttribute(GENERIC_KEY, generic);
}
return invocation;
});
String key = url.setProtocol("http").toIdentityString();
if (isGeneric) {
key = key + "/" + GENERIC_KEY;
}
jsonRpcProxyFactoryBean.setServiceUrl(key);
// 指定服務(wù)接口
jsonRpcProxyFactoryBean.setServiceInterface(serviceType);
jsonProxyFactoryBean.afterPropertiesSet();
// 創(chuàng)建客戶端代理類,該代理類實(shí)現(xiàn)rpc接口,并返回
// Protocol#refer 方法返回值類型為Invoker
// AbstractProxyProtocol 會(huì)將proxy 轉(zhuǎn)成 Invoker 對(duì)象
// 【DubboProtocol 服務(wù)引用方法沒(méi)有創(chuàng)建接口實(shí)現(xiàn)代理類】
return (T) jsonProxyFactoryBean.getObject();
/********* com.googlecode.jsonrpc4j.spring.JsonProxyFactoryBean 創(chuàng)建代理對(duì)象過(guò)程【rpc-client】 **************/
}
。。。
}
HessianProtocol 源碼
public class HessianProtocol extends AbstractProxyProtocol {
private final Map<String, HessianSkeleton> skeletonMap = new ConcurrentHashMap<String, HessianSkeleton>();
private HttpBinder httpBinder;
public HessianProtocol() {
super(HessianException.class);
}
public void setHttpBinder(HttpBinder httpBinder) {
this.httpBinder = httpBinder;
}
@Override
public int getDefaultPort() {
return 80;
}
// 消息處理handler
private class HessianHandler implements HttpHandler {
@Override
public void handle(HttpServletRequest request, HttpServletResponse response)
throws IOException, ServletException {
String uri = request.getRequestURI();
// com.caucho.hessian.server.HessianSkeleton
HessianSkeleton skeleton = skeletonMap.get(uri);
if (!"POST".equalsIgnoreCase(request.getMethod())) {
response.setStatus(500);
} else {
RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(), request.getRemotePort());
Enumeration<String> enumeration = request.getHeaderNames();
while (enumeration.hasMoreElements()) {
String key = enumeration.nextElement();
if (key.startsWith(DEFAULT_EXCHANGER)) {
RpcContext.getContext().setAttachment(key.substring(DEFAULT_EXCHANGER.length()),
request.getHeader(key));
}
}
try {
skeleton.invoke(request.getInputStream(), response.getOutputStream());
} catch (Throwable e) {
throw new ServletException(e);
}
}
}
}
@Override
protected <T> Runnable doExport(T impl, Class<T> type, URL url) throws RpcException {
String addr = getAddr(url);
ProtocolServer protocolServer = serverMap.get(addr);
if (protocolServer == null) {
// 指定消息處理handler
// 創(chuàng)建server
RemotingServer remotingServer = httpBinder.bind(url, new HessianHandler());
serverMap.put(addr, new ProxyProtocolServer(remotingServer));
}
final String path = url.getAbsolutePath();
// hessian框架服務(wù)提供方核型類:com.caucho.hessian.server.HessianSkeleton
// 創(chuàng)建 com.caucho.hessian.server.HessianSkeleton 對(duì)需要傳入rpc接口實(shí)現(xiàn)【或代理類】對(duì)象
final HessianSkeleton skeleton = new HessianSkeleton(impl, type);
skeletonMap.put(path, skeleton);
final String genericPath = path + "/" + GENERIC_KEY;
skeletonMap.put(genericPath, new HessianSkeleton(impl, GenericService.class));
return new Runnable() {
@Override
public void run() {
skeletonMap.remove(path);
skeletonMap.remove(genericPath);
}
};
}
@Override
@SuppressWarnings("unchecked")
protected <T> T doRefer(Class<T> serviceType, URL url) throws RpcException {
String generic = url.getParameter(GENERIC_KEY);
boolean isGeneric = ProtocolUtils.isGeneric(generic) || serviceType.equals(GenericService.class);
if (isGeneric) {
RpcContext.getContext().setAttachment(GENERIC_KEY, generic);
url = url.setPath(url.getPath() + "/" + GENERIC_KEY);
}
/********* com.caucho.hessian.client.HessianProxyFactory 創(chuàng)建代理對(duì)象過(guò)程【rpc-client】 **************/
HessianProxyFactory hessianProxyFactory = new HessianProxyFactory();
boolean isHessian2Request = url.getParameter(HESSIAN2_REQUEST_KEY, DEFAULT_HESSIAN2_REQUEST);
hessianProxyFactory.setHessian2Request(isHessian2Request);
boolean isOverloadEnabled = url.getParameter(HESSIAN_OVERLOAD_METHOD_KEY, DEFAULT_HESSIAN_OVERLOAD_METHOD);
hessianProxyFactory.setOverloadEnabled(isOverloadEnabled);
String client = url.getParameter(CLIENT_KEY, DEFAULT_HTTP_CLIENT);
if ("httpclient".equals(client)) {
HessianConnectionFactory factory = new HttpClientConnectionFactory();
factory.setHessianProxyFactory(hessianProxyFactory);
hessianProxyFactory.setConnectionFactory(factory);
} else if (client != null && client.length() > 0 && !DEFAULT_HTTP_CLIENT.equals(client)) {
throw new IllegalStateException("Unsupported http protocol client=\"" + client + "\"!");
} else {
HessianConnectionFactory factory = new DubboHessianURLConnectionFactory();
factory.setHessianProxyFactory(hessianProxyFactory);
hessianProxyFactory.setConnectionFactory(factory);
}
int timeout = url.getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
hessianProxyFactory.setConnectTimeout(timeout);
hessianProxyFactory.setReadTimeout(timeout);
// 創(chuàng)建客戶端代理類,該代理類實(shí)現(xiàn)rpc接口,并返回
// Protocol#refer 方法返回值類型為Invoker
// AbstractProxyProtocol 會(huì)將proxy 轉(zhuǎn)成 Invoker 對(duì)象
// 【DubboProtocol 服務(wù)引用方法沒(méi)有創(chuàng)建接口實(shí)現(xiàn)代理類】
return (T) hessianProxyFactory.create(serviceType, url.setProtocol("http").toJavaURL(), Thread.currentThread().getContextClassLoader());
/********* com.caucho.hessian.client.HessianProxyFactory 創(chuàng)建代理對(duì)象過(guò)程【rpc-client】 **************/
}
。。。
}
只有服務(wù)引用,沒(méi)有服務(wù)暴露
RedisProtocol、MemcacheProtocol【非代理協(xié)議】
都有第三方【緩存】服務(wù)器
consumer只能調(diào)用對(duì)應(yīng)get(k)、set(k,v)、delete(k) 三個(gè)方法, 與redis| memcache 交互
該協(xié)議就是對(duì)緩存基礎(chǔ)方法的封裝,感覺(jué)實(shí)用價(jià)值不大
RedisProtocol 源碼【MemcacheProtocol代碼類似】
public class RedisProtocol extends AbstractProtocol {
public static final int DEFAULT_PORT = 6379;
。。。
@Override
protected <T> Invoker<T> protocolBindingRefer(final Class<T> type, final URL url) throws RpcException {
try {
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setTestOnBorrow(url.getParameter("test.on.borrow", true));
config.setTestOnReturn(url.getParameter("test.on.return", false));
config.setTestWhileIdle(url.getParameter("test.while.idle", false));
if (url.getParameter("max.idle", 0) > 0) {
config.setMaxIdle(url.getParameter("max.idle", 0));
}
if (url.getParameter("min.idle", 0) > 0) {
config.setMinIdle(url.getParameter("min.idle", 0));
}
if (url.getParameter("max.active", 0) > 0) {
config.setMaxTotal(url.getParameter("max.active", 0));
}
if (url.getParameter("max.total", 0) > 0) {
config.setMaxTotal(url.getParameter("max.total", 0));
}
if (url.getParameter("max.wait", 0) > 0) {
config.setMaxWaitMillis(url.getParameter("max.wait", 0));
}
if (url.getParameter("num.tests.per.eviction.run", 0) > 0) {
config.setNumTestsPerEvictionRun(url.getParameter("num.tests.per.eviction.run", 0));
}
if (url.getParameter("time.between.eviction.runs.millis", 0) > 0) {
config.setTimeBetweenEvictionRunsMillis(url.getParameter("time.between.eviction.runs.millis", 0));
}
if (url.getParameter("min.evictable.idle.time.millis", 0) > 0) {
config.setMinEvictableIdleTimeMillis(url.getParameter("min.evictable.idle.time.millis", 0));
}
final JedisPool jedisPool = new JedisPool(config, url.getHost(), url.getPort(DEFAULT_PORT),
url.getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT),
StringUtils.isBlank(url.getPassword()) ? null : url.getPassword(),
url.getParameter("db.index", 0));
final int expiry = url.getParameter("expiry", 0);
// 可指定類似get的方法名
final String get = url.getParameter("get", "get");
// 可指定類似set的方法名
final String set = url.getParameter("set", Map.class.equals(type) ? "put" : "set");
// 可指定類似delete的方法名
final String delete = url.getParameter("delete", Map.class.equals(type) ? "remove" : "delete");
return new AbstractInvoker<T>(type, url) {
@Override
protected Result doInvoke(Invocation invocation) throws Throwable {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
if (get.equals(invocation.getMethodName())) {
// 不是get(k) 則報(bào)錯(cuò)
if (invocation.getArguments().length != 1) {
throw new IllegalArgumentException("The redis get method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);
}
byte[] value = jedis.get(String.valueOf(invocation.getArguments()[0]).getBytes());
if (value == null) {
return AsyncRpcResult.newDefaultAsyncResult(invocation);
}
ObjectInput oin = getSerialization(url).deserialize(url, new ByteArrayInputStream(value));
return AsyncRpcResult.newDefaultAsyncResult(oin.readObject(), invocation);
} else if (set.equals(invocation.getMethodName())) {
// 不是set(k, v) 則報(bào)錯(cuò)
if (invocation.getArguments().length != 2) {
throw new IllegalArgumentException("The redis set method arguments mismatch, must be two arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);
}
byte[] key = String.valueOf(invocation.getArguments()[0]).getBytes();
ByteArrayOutputStream output = new ByteArrayOutputStream();
ObjectOutput value = getSerialization(url).serialize(url, output);
value.writeObject(invocation.getArguments()[1]);
jedis.set(key, output.toByteArray());
if (expiry > 1000) {
jedis.expire(key, expiry / 1000);
}
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else if (delete.equals(invocation.getMethodName())) {
// 不是delete(k) 則報(bào)錯(cuò)
if (invocation.getArguments().length != 1) {
throw new IllegalArgumentException("The redis delete method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);
}
jedis.del(String.valueOf(invocation.getArguments()[0]).getBytes());
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
// 其余方法一律不支持
throw new UnsupportedOperationException("Unsupported method " + invocation.getMethodName() + " in redis service.");
}
} catch (Throwable t) {
RpcException re = new RpcException("Failed to invoke redis service method. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url + ", cause: " + t.getMessage(), t);
if (t instanceof TimeoutException || t instanceof SocketTimeoutException) {
re.setCode(RpcException.TIMEOUT_EXCEPTION);
} else if (t instanceof JedisConnectionException || t instanceof IOException) {
re.setCode(RpcException.NETWORK_EXCEPTION);
} else if (t instanceof JedisDataException) {
re.setCode(RpcException.SERIALIZATION_EXCEPTION);
}
throw re;
} finally {
if (jedis != null) {
try {
jedis.close();
} catch (Throwable t) {
logger.warn("returnResource error: " + t.getMessage(), t);
}
}
}
}
@Override
public void destroy() {
super.destroy();
try {
jedisPool.destroy();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
};
} catch (Throwable t) {
throw new RpcException("Failed to refer redis service. interface: " + type.getName() + ", url: " + url + ", cause: " + t.getMessage(), t);
}
}
}
擴(kuò)展第三方RPC服務(wù)協(xié)議
GrpcProtocol,ThriftProtocol【代理協(xié)議子類】
【暫時(shí)沒(méi)研究,貼一些概念】
gRPC 是一個(gè)高性能、開(kāi)源和通用的 RPC 框架,面向移動(dòng)和 HTTP/2 設(shè)計(jì), ThriftProtocol
Thrift是一種接口描述語(yǔ)言和二進(jìn)制通訊協(xié)議,它被用來(lái)定義和創(chuàng)建跨語(yǔ)言的服務(wù)。它被當(dāng)作一個(gè)遠(yuǎn)程過(guò)程調(diào)用(RPC)框架來(lái)使用,是由Facebook為“大規(guī)??缯Z(yǔ)言服務(wù)開(kāi)發(fā)”而開(kāi)發(fā)的。
非RPC層協(xié)議
QosProtocolWrapper:服務(wù)健康檢查、質(zhì)量探測(cè) 會(huì)用到該協(xié)議
RegistryProtocol:注冊(cè)階段使用,在服務(wù)注冊(cè)文章中重點(diǎn)分析
ProtocolFilterWrapper:filter協(xié)議包裝類, 可參考dubbo filter加載,extension Wrapper擴(kuò)展
