自己動手從0開始實現(xiàn)一個分布式RPC框架

服務提供方 Serivce Provider
服務消費方 Servce Consumer
注冊中心 Registery


消息定長,例如每個報文的大小為固定長度100字節(jié),如果不夠用空格補足。
在包尾特殊結束符進行分割。
將消息分為消息頭和消息體,消息頭中包含表示消息總長度(或者消息體長度)的字段。
+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+| BYTE | | | | | | | ........+--------------------------------------------+--------+-----------------+--------+--------+--------+--------+--------+--------+-----------------+| magic | version| type | content lenth | content byte[] | |+--------+-----------------------------------------------------------------------------------------+--------------------------------------------+
第一個字節(jié)是魔法數(shù),比如我定義為0X35。
第二個字節(jié)代表協(xié)議版本號,以便對協(xié)議進行擴展,使用不同的協(xié)議解析器。
第三個字節(jié)是請求類型,如0代表請求1代表響應。
第四個字節(jié)表示消息長度,即此四個字節(jié)后面此長度的內(nèi)容是消息content。










一是在應用的Spring Context初始化完成事件時觸發(fā),掃描所有的Bean,將Bean中帶有OrcRpcConsumer注解的field獲取到,然后創(chuàng)建field類型的代理對象,創(chuàng)建完成后,將代理對象set給此field。后續(xù)就通過該代理對象創(chuàng)建服務端連接,并發(fā)起調(diào)用。
二是通過Spring的BeanFactoryPostProcessor,其可以對bean的定義BeanDefinition(配置元數(shù)據(jù))進行處理;Spring IOC會在容器實例化任何其他bean之前運行BeanFactoryPostProcessor讀取BeanDefinition,可以修改這些BeanDefinition,也可以新增一些BeanDefinition。

public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory)throws BeansException {this.beanFactory = beanFactory;postProcessRpcConsumerBeanFactory(beanFactory, (BeanDefinitionRegistry)beanFactory);}private void postProcessRpcConsumerBeanFactory(ConfigurableListableBeanFactory beanFactory, BeanDefinitionRegistry beanDefinitionRegistry) {String[] beanDefinitionNames = beanFactory.getBeanDefinitionNames();int len = beanDefinitionNames.length;for (int i = 0; i < len; i++) {String beanDefinitionName = beanDefinitionNames[i];BeanDefinition beanDefinition = beanFactory.getBeanDefinition(beanDefinitionName);String beanClassName = beanDefinition.getBeanClassName();if (beanClassName != null) {Class<?> clazz = ClassUtils.resolveClassName(beanClassName, classLoader);ReflectionUtils.doWithFields(clazz, new FieldCallback() {public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException {parseField(field);}});}}Iterator<Entry<String, BeanDefinition>> it = beanDefinitions.entrySet().iterator();while (it.hasNext()) {Entry<String, BeanDefinition> entry = it.next();if (context.containsBean(entry.getKey())) {throw new IllegalArgumentException("Spring context already has a bean named " + entry.getKey());}beanDefinitionRegistry.registerBeanDefinition(entry.getKey(), entry.getValue());log.info("register OrcRpcConsumerBean definition: {}", entry.getKey());}}private void parseField(Field field) {// 獲取所有OrcRpcConsumer注解OrcRpcConsumer orcRpcConsumer = field.getAnnotation(OrcRpcConsumer.class);if (orcRpcConsumer != null) {// 使用field的類型和OrcRpcConsumer注解一起生成BeanDefinitionOrcRpcConsumerBeanDefinitionBuilder beanDefinitionBuilder = new OrcRpcConsumerBeanDefinitionBuilder(field.getType(), orcRpcConsumer);BeanDefinition beanDefinition = beanDefinitionBuilder.build();beanDefinitions.put(field.getName(), beanDefinition);}}
public class JdkProxyFactory implements ProxyFactory{public Object getProxy(ServiceMetadata serviceMetadata) {return Proxy.newProxyInstance(serviceMetadata.getClazz().getClassLoader(), new Class[] {serviceMetadata.getClazz()},new ClientInvocationHandler(serviceMetadata));}private class ClientInvocationHandler implements InvocationHandler {private ServiceMetadata serviceMetadata;public ClientInvocationHandler(ServiceMetadata serviceMetadata) {this.serviceMetadata = serviceMetadata;}public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {String serviceId = ServiceUtils.getServiceId(serviceMetadata);// 通過負載均衡器選取一個服務提供方地址ServiceURL service = InvocationServiceSelector.select(serviceMetadata);OrcRpcRequest request = new OrcRpcRequest();request.setMethod(method.getName());request.setParameterTypes(method.getParameterTypes());request.setParameters(args);request.setRequestId(UUID.randomUUID().toString());request.setServiceId(serviceId);OrcRpcResponse response = InvocationClientContainer.getInvocationClient(service.getServerNet()).invoke(request, service);if (response.getStatus() == RpcStatusEnum.SUCCESS) {return response.getData();} else if (response.getException() != null) {throw new OrcRpcException(response.getException().getMessage());} else {throw new OrcRpcException(response.getStatus().name());}}}}





+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+| BYTE | | | | | | | ........+--------------------------------------------+--------+-----------------+--------+--------+--------+--------+--------+--------+-----------------+| magic | version| type | content lenth | content byte[] | |+--------+-----------------------------------------------------------------------------------------+--------------------------------------------+
第一個字節(jié)是魔法數(shù)定義為0X35。
第二個字節(jié)代表協(xié)議版本號。
第三個字節(jié)是請求類型,0代表請求1代表響應。
第四個字節(jié)表示消息長度,即此四個字節(jié)后面此長度的內(nèi)容是消息content。
protected void encode(ChannelHandlerContext channelHandlerContext, ProtocolMsg protocolMsg, ByteBuf byteBuf)throws Exception {// 寫入?yún)f(xié)議頭byteBuf.writeByte(ProtocolConstant.MAGIC);// 寫入版本byteBuf.writeByte(ProtocolConstant.DEFAULT_VERSION);// 寫入請求類型byteBuf.writeByte(protocolMsg.getMsgType());// 寫入消息長度byteBuf.writeInt(protocolMsg.getContent().length);// 寫入消息內(nèi)容byteBuf.writeBytes(protocolMsg.getContent());}
/*** 協(xié)議開始的標志 magic + version + type + length 占據(jù)7個字節(jié)*/public final int BASE_LENGTH = 7;@Overrideprotected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list)throws Exception {// 可讀字節(jié)小于基本長度,無法解析出payload長度,返回if (byteBuf.readableBytes() < BASE_LENGTH) {return;}// 記錄包頭開始的indexint beginIndex;while (true) {// 記錄包頭開始的indexbeginIndex = byteBuf.readerIndex();// 標記包頭開始的indexbyteBuf.markReaderIndex();// 讀到了協(xié)議頭魔數(shù),結束循環(huán)if (byteBuf.readByte() == ProtocolConstant.MAGIC) {break;}// 未讀到包頭,略過一個字節(jié)// 每次略過一個字節(jié),去讀取包頭信息的開始標記byteBuf.resetReaderIndex();byteBuf.readByte();/*** 當略過,一個字節(jié)之后,數(shù)據(jù)包的長度,又變得不滿足* 此時結束。等待后面的數(shù)據(jù)到達*/if (byteBuf.readableBytes() < BASE_LENGTH) {return;}}// 讀取版本號byte version = byteBuf.readByte();// 讀取消息類型byte type = byteBuf.readByte();// 讀取消息長度int length = byteBuf.readInt();// 判斷本包是否完整if (byteBuf.readableBytes() < length) {// 還原讀指針byteBuf.readerIndex(beginIndex);return;}byte[] data = new byte[length];byteBuf.readBytes(data);ProtocolMsg msg = new ProtocolMsg();msg.setMsgType(type);msg.setContent(data);list.add(msg);}


推薦閱讀:
《微服務中的灰度發(fā)布是什么?實現(xiàn)思路又是什么?》
聊技術,不止于技術。
