编程开源技术交流,分享技术与知识

网站首页 > 开源技术 正文

Motan RPC 框架分析(rpc框架性能基本比较测试)

wxchong 2024-08-18 00:38:56 开源技术 22 ℃ 0 评论

一、框架简介

Motan 是新浪微博开源的一套高性能、易于使用的分布式远程服务调用(RPC)框架。

Motan 的核心模块交互关系如下:

二、核心模块介绍

2.0 SPI 机制

SPI 机制支持 JDK 的 ServiceProvider 机制并进行了扩展,接口的实现放在 META-INF/services/ 目录下以接口的完全类名命名的文件里,每个实现的完全类名占一行。

每个实现类可以加上 SpiMeta(name="implName") 来指定名称,ExtensionLoader 可以通过接口类型和命名从多个实现中找到指定实现。

2.1 register

用来和注册中心交互,包括服务注册、订阅服务、接收服务变更通知、发送心跳等功能。

// 服务发现功能的抽象
public interface DiscoveryService {
 // 订阅到注册中心
 void subscribe(URL url, NotifyListener listener);
 // 取消订阅
 void unsubscribe(URL url, NotifyListener listener);
 // 根据 url 描述的服务从注册中心获取该服务的所有提供者的信息
 List<URL> discover(URL url);
}
// 服务注册功能的抽象
public interface RegistryService {
 void register(URL url);
 void unregister(URL url);
 void available(URL url);
 void unavailable(URL url);
 Collection<URL> getRegisteredServiceUrls();
}
// 表示一个注册中心的抽象
@Spi(scope = Scope.SINGLETON)
public interface Registry extends RegistryService, DiscoveryService {
 // 获取该注册中心的描述信息
 URL getUrl();
}

2.2 protocol

用来进行 RPC 服务描述和配置管理,可以通过 Filter 进行扩展、功能增强。

@Spi(scope = Scope.SINGLETON)
public interface Protocol {
 // 暴露服务
 <T> Exporter<T> export(Provider<T> provider, URL url);
 // 调用端收到注册中心通知的服务提供者实例信息 serviceUrl 后,
 // 根据 serviceUrl 创建对指定实例的引用
 <T> Referer<T> refer(Class<T> clz, URL url, URL serviceUrl);
 void destroy();
}
// Filter 用于进行功能扩展
@Spi
public interface Filter {
 Response filter(Caller<?> caller, Request request);
}
// 把 Filter 应用到 protocol 上,进行功能增强
public class ProtocolFilterDecorator implements Protocol {
 private Protocol protocol;
 public ProtocolFilterDecorator(Protocol protocol) {
 this.protocol = protocol;
 }
 @Override
 public <T> Exporter<T> export(Provider<T> provider, URL url) {
 // 对原始的 provider 进行包装增强
 return protocol.export(decorateWithFilter(provider, url), url);
 }
 @Override
 public <T> Referer<T> refer(Class<T> clz, URL url, URL serviceUrl) {
 // 对原始 protocol 生成的 Referer 进行包装增强,这样可以在调用前、后进行处理。
 return decorateWithFilter(protocol.refer(clz, url, serviceUrl), url);
 }
 @Override
 public void destroy() {
 protocol.destroy();
 }
 // 省略其它代码
}

2.3 serialize

此模块负责把请求、响应进行序列化和反序列化。默认采用 Hessian2 。

@Spi(scope=Scope.PROTOTYPE)
public interface Serialization {
 byte[] serialize(Object obj) throws IOException;
 <T> T deserialize(byte[] bytes, Class<T> clz) throws IOException;
 byte[] serializeMulti(Object[] data) throws IOException;
 Object[] deserializeMulti(byte[] data, Class<?>[] classes) throws IOException;
 /**
 * serializaion的唯一编号,用于传输协议中指定序列化方式。每种序列化的编号必须唯一。
 * @return 由于编码规范限制,序列化方式最大支持32种,因此返回值必须在0-31之间。
 */
 int getSerializationNumber();
}

2.4 transport

实现远程通信,默认采用 Netty NIO 的 TCP 长链接实现。

2.5 cluster

仅供 client 端使用的模块。

ClusterSupport 收到注册中心的通知,根据这些服务提供者的信息生成一组 Referer 对象,然后用这组 Referer 通过 Cluster,Cluster 再用这些 Referer 刷新持有的 LoadBalance 。

构建 Referer 的时候会实例化 Client,以完成远程调用。Client 位于传输层,完成请求、响应的编码、解码工作,然后进行序列化、反序列化,再通过 Channel 进行网络传输。

二、构建客户端代理

创建服务的代理的过程:

1. 收集服务的描述信息 refUrl ;

2. 根据 refUrl 和 registryUrl 创建 ClusterSupport;

3. 根据 interfaceClass 和 ClusterSupport.getCluster() 创建代理对象;

4. 以代理对象 RefererInvocationHandler 为例,它持有 Cluster 就可以进行元旦调用。

public class DefaultRpcReferer<T> extends AbstractReferer<T> {
 protected Client client;
 protected EndpointFactory endpointFactory;
 public DefaultRpcReferer(Class<T> clz, URL url, URL serviceUrl) {
 super(clz, url, serviceUrl);
 endpointFactory =
 ExtensionLoader.getExtensionLoader(EndpointFactory.class).getExtension(
 url.getParameter(URLParamType.endpointFactory.getName(), URLParamType.endpointFactory.getValue()));
 // 创建 Client
 client = endpointFactory.createClient(url);
 }
 @Override
 protected Response doCall(Request request) {
 try {
 // 为了能够实现跨group请求,需要使用server端的group。
 request.setAttachment(URLParamType.group.getName(), serviceUrl.getGroup());
 // 通过 Client 完成远程调用
 return client.request(request);
 } catch (TransportException exception) {
 throw new MotanServiceException("DefaultRpcReferer call Error: url=" + url.getUri(), exception);
 }
 }
 // 省略其他代码 
}

三、服务端暴露服务

服务端对外暴露服务要考虑两点:

1. 一个网络端口有多个服务提供者,如何找到目标提供者?

在transport层实现,ProviderMessageRouter 相当于一个请求分发器,把请求分发调用目前提供者。分发是根据服务描述 URL 里的关键信息生成 serviceKey (组成: group + "/" + interfaceName + "/" + version)来确定提供者,再根据请求里的方法名、方法参数描述来定位要调用的 Method 。

public class ProviderMessageRouter implements MessageHandler {
 // serviceKey 到具体服务提供者的映射
 protected Map<String, Provider<?>> providers = new HashMap<>();
 public Object handle(Channel channel, Object message) {
 Request request = (Request) message;
 String serviceKey = MotanFrameworkUtil.getServiceKey(request);
 Provider<?> provider = providers.get(serviceKey);
 Method method = provider.lookupMethod(request.getMethodName(), request.getParamtersDesc());
 fillParamDesc(request, method);
 processLazyDeserialize(request, method);
 return call(request, provider);
 }
 protected Response call(Request request, Provider<?> provider) {
 try {
 return provider.call(request);
 } catch (Exception e) {
 DefaultResponse response = new DefaultResponse();
 response.setException(new MotanBizException("provider call process error", e));
 return response;
 }
 }
 public synchronized void addProvider(Provider<?> provider) {
 String serviceKey = MotanFrameworkUtil.getServiceKey(provider.getUrl());
 if (providers.containsKey(serviceKey)) {
 throw new MotanFrameworkException("provider alread exist: " + serviceKey);
 }
 providers.put(serviceKey, provider);
 }
}

2. 有了请求分发处理器后,就可以作为 Server 的消息处理器。

public class DefaultRpcExporter<T> extends AbstractExporter<T> {
 protected final ConcurrentHashMap<String, ProviderMessageRouter> ipPort2RequestRouter;
 protected final ConcurrentHashMap<String, Exporter<?>> exporterMap;
 protected Server server;
 protected EndpointFactory endpointFactory;
 public DefaultRpcExporter(Provider<T> provider, URL url, ConcurrentHashMap<String, ProviderMessageRouter> ipPort2RequestRouter,
 ConcurrentHashMap<String, Exporter<?>> exporterMap) {
 super(provider, url);
 this.exporterMap = exporterMap;
 this.ipPort2RequestRouter = ipPort2RequestRouter;
 // 初始化消息分发处理器
 ProviderMessageRouter requestRouter = initRequestRouter(url);
 endpointFactory =
 ExtensionLoader.getExtensionLoader(EndpointFactory.class).getExtension(
 url.getParameter(URLParamType.endpointFactory.getName(), URLParamType.endpointFactory.getValue()));
 // 创建 Server
 server = endpointFactory.createServer(url, requestRouter);
 }
}
public abstract class AbstractEndpointFactory implements EndpointFactory {
 /** 维持share channel 的service列表 **/
 protected Map<String, Server> ipPort2ServerShareChannel = new HashMap<String, Server>();
 protected ConcurrentMap<Server, Set<String>> server2UrlsShareChannel = new ConcurrentHashMap<Server, Set<String>>();
 private EndpointManager heartbeatClientEndpointManager = null;
 public AbstractEndpointFactory() {
 // 心跳管理
 heartbeatClientEndpointManager = new HeartbeatClientEndpointManager();
 heartbeatClientEndpointManager.init();
 }
 public Server createServer(URL url, MessageHandler messageHandler) {
 messageHandler = getHeartbeatFactory(url).wrapMessageHandler(messageHandler);
 synchronized (ipPort2ServerShareChannel) {
 String ipPort = url.getServerPortStr();
 String protocolKey = MotanFrameworkUtil.getProtocolKey(url);
 boolean shareChannel =
 url.getBooleanParameter(URLParamType.shareChannel.getName(), URLParamType.shareChannel.getBooleanValue());
 if (!shareChannel) { // 独享一个端口
 LoggerUtil.info(this.getClass().getSimpleName() + " create no_share_channel server: url={}", url);
 // 如果端口已经被使用了,使用该server bind 会有异常
 return innerCreateServer(url, messageHandler);
 }
 Server server = ipPort2ServerShareChannel.get(ipPort);
 if (server != null) {
 // 省略:无法共享 channel 的抛出异常。
 saveEndpoint2Urls(server2UrlsShareChannel, server, protocolKey);
 // 共享已经存在的 Server
 return server;
 }
 url = url.createCopy();
 url.setPath(""); // 共享server端口,由于有多个interfaces存在,所以把path设置为空
 // ipPort 上还没创建过 Server,创建一个新的
 server = innerCreateServer(url, messageHandler);
 ipPort2ServerShareChannel.put(ipPort, server);
 saveEndpoint2Urls(server2UrlsShareChannel, server, protocolKey);
 return server;
 }
 }
}

四、调用过程实现

  1. Cluster 收到调用请求后,通过自身的 HA 策略进行调用;
  2. HA 策略实现根据传入 LB 获取到一个 Referer 实例进行调用;
  3. 根据 Referer 的调用结果来执行 HA 策略。
// ClusterSpi.java
public Response call(Request request) {
 if (available.get()) {
 try {
 return haStrategy.call(request, loadBalance);
 } catch (Exception e) {
 return callFalse(request, e);
 }
 }
 return callFalse(request, new MotanServiceException(MotanErrorMsgConstant.SERVICE_UNFOUND));
}
@SpiMeta(name = "failfast")
public class FailfastHaStrategy<T> extends AbstractHaStrategy<T> {
 @Override
 public Response call(Request request, LoadBalance<T> loadBalance) {
 Referer<T> refer = loadBalance.select(request);
 return refer.call(request);
 }
}

4. Referer 通过持有的 Client 进行调用;

5. Client 调用时从 Channel 连接池获取到一个 Channel 后再进行写入请求;

6. Channel 首先给请求生成一个 ResponseFuture,注册到 Client 的 callbackMap 上,这样可以阻塞住调用线程,实现同步调用,也方便收到响应时找到对应的调用线程;

7. Channel 对于写入的请求根据 Client 在 Netty pipeline 上设置的 codec 组件进行编码、解码操作;

8. codec 会用 Serialization 组件进行序列化、反序列化;

9. 最终把序列化后的请求写入网络连接。

10. 服务端的网络监听接收到请求后,用 codec 组件进行反序列化、解码得到一个请求对象;

11. Server 的消息处理器 ProviderMessageRouter.handle(Channel channel, Object message) 方法里,根据请求信息得到要调用的 serviceKey, 从而找到对应的 provider ;

12. 再根据要调用的方法名、参数信息找到对应的实现方法,进行调用,得到响应;

13. 再把响应写回到网络连接里;

14. 客户端最终得到响应对象后,从 Client 维护的 callbackMap 找出请求 的 ResponseFuture 并设置结果,使调用线程可以返回。

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表