RPC 白银时代—— Alibaba Dubbo
前面讲到的系统通信架构和分布式应用一直存在一定的矛盾,于是架构团队开始着手思考解决的思路。
- 在现有的RPC模块的基础上进行改造,增加服务注册、发现、服务调度以及客户端自负载等服务治理相关的功能
- 上面的方案有更好的实现阿里巴巴开源的 Dubbo 框架
- 在中间的负载均衡层也就是 Nginx 层添加服务动态注册、限流等功能,同时也可以将灰度发布服务熔断等功能加进去,形成一个全面的服务治理网关
- 上面的方案有可以借鉴的 lua-nginx-module 也就是现在的 Openresty
自己造轮子,可能是每一个工程师都会不自然地作出的选择,在自主可控之外,还意味着一展拳脚的机会。加上现有的产品线应用已经非常广泛,也如前面说述我们的RPC模块其实上逻辑也非常清晰。可以选择在 Service Exporter 生成时,向注册中心(数据库或者Redis)注册服务、并周期性发送心跳进行活性探测,在生成 Service Accessor 代理的时候从注册中心按照系统 + 服务名称获取对应服务的可用节点列表,再进行自负载均衡,直接连接到对应的服务节点获取服务。其他服务治理也可以通过对注册中心进行管理配合 Exporter 和 Accessor 来完成,这样对现有系统也可以提供平滑的升级。
采用 lua-nginx-module,这种做法也需要对 Service Exporter 进行服务注册功能的改造,但对于调用方没有侵入性,并且移动端和外部系统也可以接入到这个服务中。当时我们也有同事在做基于 Kong(也是基于 Nginx + lua-nginx-module 的二次开发产品)的安全网关功能,将服务的认证、限流、WAF等等功能集成在一起进行统一的管理。
后续的调研中,我们首先排除了 lua-nginx-module,因为我们主要是提供软件服务,我们一大部分客户是大型企业,如果其内部采购了F5,可能会对我们采用 Nginx 来作为负载均衡提出不同的意见,这会面临商务上的一些风险;第一种方案,在研究了 Dubbo 的源代码后,发现其实现模式和我们的设想非常接近。对项目代码的改造,大部分可以通过修改 XML 文件配置来完成。而且 Dubbo 的产品成熟度、文档质量也比较高,并且已经被很多大型公司广泛应用。当时我们团队成员只有4个人,考虑到资源的有限以及Dubbo框架的优秀。最后我们选择的基于 Dubbo 的方案进行技术框架的改造。
Dubbo
由于 Dubbo 在国内公司的广泛应用,对 Dubbo 框架的分析也是车载斗量。这里仅作为我学习和使用这个优秀的 RPC 框架的总结吧,毕竟我连 BlazeDS 都写完了,呵呵。
1. 概览
Dubbo 的整体架构也符合 RPC 标准范式
- 整体流程为Container启动后,也启动其中的 Provider,此时的 Provider 已经可以接入。
- 将 Provider 的相关信息(主要是服务名称、IP、端口、协议等等)注册到 Registry。
- Consumer 向注册中心注册需要调用的服务监听器,目的是为了当 Registry 有可用服务的时候可以通知 Consumer。
- 当 Registry 有可用服务的时候,通知之前注册的 Cosnumer,有哪些 Provider 是可以调用的。以Zookeeper为例子,只要对应的节点有Change,就会发起监听回调。
- Consumer 在收到通知之后,从 Provider 中选择一个或者多个,直接通过网络连接发起对 Provider 的请求调用,之后的通信就直接在 Consumer 和 Provider 之间进行,和注册中心没有关系。
- Consumer 和 Provider 都会在内存中统计调用信息,定时发送给 Monitor 作为监控使用
对比我们之前采用的 RPC 架构,Dubbo 从架构层面,最重要的是引入 Registry 和 Monitor。Registry 主要实现了服务发现、服务注册、服务治理。有了 Registry 之后,Consumer 和 Provider 获得更大程度的自由,彼此可以独立进行开发和演化。
- Consumer 无需配置固定的 Provider 地址,通过 Registry “注册-通知”机制可以动态获取 Provider 服务列表,再根据负载均衡算法和路由规则对 Provider 进行访问。
- Provider 变化相对比较小,其主要的变化是在需要向 Registry 发布自己的服务状态。
- Registry 本质上是一个注册表,Consumer 查询,Provider 注册。并且通过对注册和发布信息(主要是发布信息)的修改,可以实现服务治理的功能,比如调整 Provider 的负载权重,将新数据通知 Consumer 就可以干预服务调用,实现服务治理。
- Monitor 提供了一个各个节点服务状况统计信息的汇集点,通过这个汇集点可以知悉整体服务的运行情况。
性能层面,可选择高性能的 Netty 通信框架和 dubbo 序列化框架的组合,可以提供超过单机超过 10000 TPS 的支撑。在绝大多数场景下,通信环节都不会成为性能瓶颈。
功能层面,除了最重要的服务治理之外,异步调用、并发控制、多协议和多版本支持都是十分常用和实用的功能。同时 Dubbo 提供基于 SPI 的拓展机制,也可以方便我们在使用的过程中做一些魔改,以兼容已有的系统架构。例如通过 Filter 拓展,实现隐形传递 Session 信息、调用日志和已有日志分析系统的集成。
易用性方面,Dubbo 提供和 Spring 集成的结构,实际上和我们之前对 Spring Remoting 的封装非常接近,迁移成本很低,这也是最终方案能够获批的一个重要的原因。
缺点,作为一款 RPC 框架,Dubbo已经非常成熟,众多公司的选择也说明其可靠性,在获得了开源的代码和足够良好的拓展性之后,部分功能上的不足,也可以通过自己拓展来覆盖需求。dubbo 的缺点其实是类似这种共享内核式的二进制 RPC 框架的缺点:SOA的一个核心目标是服务的复用,而对于我们的系统而言,不同的系统被划分成不同的模块,不同的模块之间相互调用都需要共享 jar 包,随着关系越来越复杂,时常会出现由于序列化和参数不满足导致的调用失败的情况,实际上就是 jar 包没有同步更新到,还有一些情况由于 jar 包的共享而导致的循环依赖问题,导致工程中引入无关的业务代码。
解决的办法采用的是规范项目的管理和发布流程
- 以上层调用下层为准,最坏不能出现循环依赖。
- 接口兼容性设计,小版本升级不兼容的接口进行增加而非修改,结合日志跟踪系统最后统一下线旧接口。
后面有部分项目采用 rest 协议,系统之间的交互全部使用 JSON 格式,这样管理和便利性都所有提升。
2. 设计
下面谈一谈 Dubbo 的设计
2.1 URL 总线设计
如上面这张Dubbo整体架构图可知,Dubbo按照功能分成了多个不同的层,具体每层的功能和作用可以再官方网站查看 Apache Dubbo
框架在初始化和使用的过程中,需要在层和层之间交换参数,例如服务注册的时候需要将注册信息传递给 Registry,服务暴露的时候需要通知 Protocol 使用什么协议暴露以及暴露服务的各项参数、生成客户端代理的时候需要知道要调用的远端 Provider 的地址、超时时间等等。一般情况下,我们会将参数抽象成为 POJO,但是这样拓展性较差,需要不断新增参数类型,或者调整参数,沟通成本也比较大,一种可行的方案是将参数定义在一个 Map 里面,采用参数透传的办法。
对于Dubbo这样拥有非常多配置参数并且经常需要进行参数调整的框架,采用 POJO 的方式显然比较麻烦。因此,Dubbo 采用 URL 作为配置信息的统一格式,将各层之间的通信抽象为URL的形式。例如采用 Dubbo 暴露一个服务会对应到下面的 URL。
dubbo://192.168.0.211:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=192.168.0.211&bind.port=20880&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=8492&qos.port=22222&side=provider×tamp=1542703042366
Config 层获取需要暴露服务的配置信息,将其封装在 URL 中,并传递到 Protocol 层。Protocol 层,根据 URL 的内容,可以解读出:使用 dubbo 协议暴露DemoService服务,绑定在211服务器和20880端口,以及其他相关的信息,然后根据这些信息找到处理 dubbo 协议的 DubboProtocol,并根据对应的信息,暴露服务,并且在剩下暴露服务的过程中,也采用这种 URL 总线的方式添加和传递参数。
这种方式和采用 Map 传参的方式很接近,实际上在 dubbo 中也是使用一个 map 来保存配置参数信息,key 值基本都定义在 common.Constants 中。
这种 URL 总线的设计,也是后面的 SPI 拓展点的基础。
2.2 拓展机制
Dubbo 整体架构采用“微内核+插件”的开发模式,“微内核只负责组装插件”。
要怎么理解这两句话?首先要了解面向对象软件设计的 SOLID 原则中的 OCP(开闭原则),这个原则要求软件中的对象,需要对拓展开放,而对修改封闭。换而言之,就是需要在不修改源代码的情况下,对软件进行拓展和修改。一个典型的例子,就是继承,通过集成父类,并覆盖父类的方法实现来进行拓展。
要设计一个可拓展的框架或者系统
- 首先要识别出核心领域模型和可拓展点,核心领域模型可以从系统要解决的问题域中抽取。对于RPC框架而言,最重要的就是协议和调用,在Dubbo中,分别对应 Protocol 和 Invoker。
- 接着定义可拓展点,框架首先要定义好可拓展点的功能和在框架中的角色,一般使用接口或者抽象类的方式。例如,图中核心的接口:ProxyFactory、Registry、Cluster、Protocol、Exchanger等等,就是Dubbo在各个层定义的可拓展接口。
- 可拓展意味着开发者可以根据需要灵活拓展和替换部分系统功能的实现,因此不能直接依赖实现而是要根据条件选择不同的实现。
从上面的简单分析,大致就可以大致知道如何设计一个可拓展的系统
- 围绕核心域设计框架的主要代码骨架,定义可拓展点,代码的骨架定义了要做什么,使用到那些拓展接口,但是具体使用那些实现还不知道。
- 在框架启动或者运行的过程中决定要使用那些拓展实现(就是“插件”),称为装配策略,插件决定了框架的具体行为。
- 需要一个管理插件生命周期的容器,就是“微内核”,微内核可以根据装配策略,来组装和获取不同的插件实现。
这样就可以理解上面“微内核+插件”的开发模式。具体一点,Dubbo首先定义了核心的功能,对 RPC 框架来说就是服务的暴露和引用这些都定义在 Protocol 类中,但是不同的协议可能会有不同的暴露和引用方式,因此 Protocol 即使核心域又是拓展点。Dubbo在运行的过程中,需要根据用户的配置(前面已经说到了用户配置都被抽象成为 URL),来决定使用哪一个 Protocol 的实现,这个实现需要从微内核中获取;微内核根据参数将对应的Protocol实现组装好,返回给框架。其他的功能也是类似这样的处理方式。
整个框架中,流程定义由框架本身完成也就是骨架代码;具体的实现由插件完成,开发者可以在不修改框架源代码(也就是不改变框架主流程)的情况下自由实现框架的功能;骨架代码和插件之间需要通过微内核来粘合,微内核不实现功能只是作为插件的管理者,骨架代码不依赖具体的插件,而是依靠微内核来获取插件。
这样的微内核,一般都采用 Factory 模式来实现。例如 Spring 的 BeanFactory 就是整个 Spring 框架的微内核,Spring 以 Bean 为核心,各种不同的Bean实例都可以注册到 BeanFactory 内,Spring 提供 IoC 和 AOP 的功能,对Bean实例进行组装,而使用方不需要处理这些细节,只需要从Sprign 的 BeanFactory 容器中根据条件获取Bean实例即可。Spring 框架的其他组件,都是建立在这个微内核基础之上的具体实现,典型的通过AOP功能来生成具有特定功能的Bean代理类。
Dubbo采用的微内核是参考Java SPI 拓展,并在其基础上实现了简单的 IoC 和 AOP 功能。这边挑一些我觉得比较重要和难理解的点。
- 微内核实现在 ExtensionLoader,采用工厂模式。
- 基于 Java SPI 的拓展方式,要求在 META-INF/services(META-INF/dubbo、META-INF/dubbo/internal)目录下,放置文件名为接口全称。文件中为key、value键值对,value为具体实现类的全类名,key为标志值。
- 由于Dubbo采用服务总线的设计,因此决定使用哪个插件,由URL中的参数来决定。
- AOP 功能,可以采用Wrapper来实现,Wrapper需要实现拓展接口,并且有使用拓展接口作为参数的构造函数,从微内核中获取的实例会使用对应的 Wrapper 包裹
- @SPI ,主要定义拓展点的元信息,标注一个拓展点。并提供一些元数据,比如默认的拓展实现标志值
- @Active,主要用于标志一个插件自动激活,自动激活的插件,可以通过,
ExtensionLoader.getExtensionLoader(Protocol.class).getActivateExtension(url,"")
获取。也可以定义在注解中的元数据,从所有激活的插件中筛选符合条件的插件,例如只对 Consumer 或者 Provider 生效的 Filter 等。 - @Adaptive,字面意思为适配。即可标注在类上,也可以标注在方法上。
- 标注在类上,其目的是生成一个固定的适配实例。@SPI 标记的拓展接口,在获得插件的时候调用
getAdaptiveExtension
会默认返回具有@Adaptive 标记的实现类。默认情况下Dubbo只有两个类使用@Adaptive注解(AdaptiveCompiler、AdaptiveExtensionFactory) - 更多情况下,@Adaptive 标注在拓展接口的方法上,这要求标注的方法的参数或者参数中的属性需要有一个为 URL 类型。解拓展接口获取具体实现的时候,会根据 @Adaptive 注解的 value 值作为key,从 URL 的 parameter 中获取对应的 value 作为要适配的拓展实现的标志值。如果没有名字,则使用 @SPI 中设定的默认值。举一个例子,如果一个拓展接口有两个实现redis,zookeeper,在方法中注释@Adaptive(“protocol”),也就是根据 URL 的协议来决定拓展实现。则当参数中URL的协议为 redis 的时候,会使用 redis 对应的实现,为 zookeeper 的时候,会使用zookeeper的实现,这是@Adaptive的一般用途。也就是拓展未知的适配类型,具体使用的实现交给URL决定也就是调用方。Dubbo 的动态适配过程也大多数是基于此实现。
- 标注在类上,其目的是生成一个固定的适配实例。@SPI 标记的拓展接口,在获得插件的时候调用
下面看看拓展机制的具体实现
在 ExtensionLoader 中,每一个拓展点都对应一个 ExtensionLoader,可以通过类型来获取,ExtensionLoader.getExtensionLoader(Common.class)
。
第一个核心方法是获取指定标示值的拓展实现ExtensionLoader.getExtensionLoader(Common.class).getExtension("extName");
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public T getExtension(String name) {
if (name == null || name.length() == 0)
throw new IllegalArgumentException("Extension name == null");
Reference<Object> reference = cachedInstances.get(name);
if (reference == null) {
cachedInstances.putIfAbsent(name, new Reference<Object>());
reference = cachedInstances.get(name);
}
Object instance = reference.get();
if (instance == null) {
synchronized (reference) {
instance = reference.get();
if (instance == null) {
instance = createExtension(name);
reference.set(instance);
}
}
}
return (T) instance;
}
其中 name 为 @SPI 配置参数中的 key 值,在外部调用的时候,可以从URL中的获取或者构建。在这边的代码中使用了一个双重检查的单例构建形式,重点的代码在createExtension(name),里面描述了如何通过 extension 的名字,来构建一个拓展的实例,方法内使用了几层调用,为了阅读方便,下面只选择一些重点环节做说明:
- 获得拓展类型(ExtenstionLoader.type)类型中,Extension的默认实现对应的名称,这个名称只能有一个
- 获取 ClassLoader,首先获取当前线程对应的ClassLoader,其次获取加载ExtensionLoader 类的 ClassLoader
- 通过 ExtensionLoader.type 从META-INF/services以及META-INF/dubbo中读取配置文件的内容,内容为 key-values 形式, key 为 type 的全限定名,
- 得到 value 中对应类型的Class对象,如果其具有@Adaptive注解,则将其标记为cachedAdaptiveClass,一个类型,只能有一个实现使用@Adaptive注释,这个和一般的IoC原理是一样的
- 判断class对象是否具有一个以 ExtensionLoader.type 为参数的构造函数,如果有,则认定其为一个 Wrappe r类,将其加入到 cachedWrapperClasses 集合中
- 根据规则获取类的简称,一般未对应k-v文件的key
- 检查class对象是否使用了@Activate 注解;若使用则获取该注解,并存入cachedActivates 对象中,key为类的简称,若有多个简称则取第一个;
- 保存类的简称和类到extensionClasses中
上面的步骤是 getExtensionClasses 的主要内容,可以看出这个步骤主要是读取 @SPI 文件,并解析得到拓展类的各种实现。之后我们只要通过extension name就可以获取对应规则的实现类了,获得实现类之后显然需要根据实现类来进行实例化:
- 首先获得拓展类的实现,并按照规则进行注入操作,主要实现在injectExtension(instance),通过和 Spring 或者默认的 SPI 拓展体系的集成,可以根据 setter 方法的参数,从上下文中获得对应的实例对象,并注入到目标实例中。
- 如果存在包装类(Wrapper),则使用包装类层层包装 instance,这样设计的主要目的是可以实现类似 AOP 的织入功能
- 返回处理完成的实例
以上就完成了 getExtension
的调用
另一个常用的是自适配拓展,根据 URL 参数获取拓展实现
1
2
3
Common ext = ExtensionLoader.getExtensionLoader(Common.class).getAdaptiveExtension();
URL url = new URL("impl3", "1.2.3.4", 1010, "path1");
System.out.println(ext.echo(url,"aa"));
其中最重要的 getAdaptiveExtension 方法,除了在会调用前面的getExtensionClasses 等方法初始化整个 SPI 拓展类之外,其主要方法是创建 Adaptive 代理类
1
2
3
4
5
6
7
8
private Class<?> createAdaptiveExtensionClass() {
String code = createAdaptiveExtensionClassCode();
ClassLoader classLoader = findClassLoader();
com.alibaba.dubbo.common.compiler.Compiler compiler
= ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class)
.getAdaptiveExtension();
return compiler.compile(code, classLoader);
}
- createAdaptiveExtensionClassCode实际上是生成一个创建类型获得ExtenstionLoader.type+$Adaptive为类名,并实现type接口,将其各个方法,使用代理模式实现为 Common extension = ExtensionLoader.getExtensionLoader(Common.class).getExtension(extName)形式,其中 extName 从URL中获取,然后使用 extension 调用对应的方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package org.apache.dubbo.common.extension.adaptive;
import org.apache.dubbo.common.extension.ExtensionLoader;
public class Common$Adaptive implements org.apache.dubbo.common.extension.adaptive.Common {
private static final org.apache.dubbo.common.logger.Logger logger = org.apache.dubbo.common.logger.LoggerFactory.getLogger(ExtensionLoader.class);
private java.util.concurrent.atomic.AtomicInteger count = new java.util.concurrent.atomic.AtomicInteger(0);
public void echo(org.apache.dubbo.common.URL arg0) {
if (arg0 == null) throw new IllegalArgumentException("url == null");
org.apache.dubbo.common.URL url = arg0;
String extName = url.getProtocol();
if (extName == null)
throw new IllegalStateException("Fail to get extension(org.apache.dubbo.common.extension.adaptive.Common)
name from url(" + url.toString() + ") use keys([protocol])");
org.apache.dubbo.common.extension.adaptive.Common extension = null;
try {
extension = (org.apache.dubbo.common.extension.adaptive.Common) ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.extension.adaptive.Common.class).getExtension(extName);
} catch (Exception e) {
if (count.incrementAndGet() == 1) {
logger.warn("Failed to find extension named " + extName + " for type org.apache.dubbo.common.extension.adaptive.Common, will use default extension null instead.", e);
}
extension = (org.apache.dubbo.common.extension.adaptive.Common) ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.extension.adaptive.Common.class).getExtension("null");
}
extension.echo(arg0);
}
}
- 获取Compiler的拓展,它有三个实现类AdaptiveCompiler,JdkCompiler,JavassistCompiler,其中 AdaptiveCompiler 是代用@Adaptive注解,也就是默认情况下获得的为 AdaptiveCompile r实例,他也是一个代理类,具体实现编译代码功能的还是其他两个类,这个也是使用 @Adaptive 注解类型的一个应用,添加到一个默认的代理类上,实际上是一个固定的,不需要通过 URL 就可以进行拓展的例子。Compiler默认情况下是使用 JavassistCompiler 实现,通过他将生成的 code,编译和加载称为一个 Class,再经过 injectExtension 等操作,最后得到一个可以动态根据 URL 内容实现代理的适配器。
总结
Dubbo的微内核+插件的拓展方式,是实现 OCP 设计原则的一个很好的范本。在这个简单而轻量的内核之上,Dubbo 定义骨架代码和拓展点,Dubbo的使用者可以拥有和Dubbo的开发者一样的拓展能力,而不需要去修改比较稳定的骨架代码。实际上Dubbo的主要功能,都是通过各式各样的拓展插件来实现的。这种根据需求,增量式的拓展,比起在一份核心代码上进行修改要灵活和高效得多。后续在设计公司的多模块集成框架的时候,也很多参考和借鉴了 Dubbo 的这种设计思路。
服务暴露和注册
服务的暴露,首先要从服务的声明开始。在 Dubbo 中,服务声明对应的Java API 为 ServiceConfig
。
先通过setter,对ServiceConfig
进行赋值,典型的我们可以设置服务要发布的协议、服务接口、服务实现、注册中心、并发控制参数等等。
doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs)
暴露服务,其中 ProtocolConfig
定义了协议,registryURLs
这是注册中心 URL,Dubbo 支持多协议和多注册中心暴露和发布同一个服务。
当前面的参数都准备就绪之后,要暴露服务的原始 URL 大致为 dubbo://192.168.27.211:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=192.168.27.211&bind.port=20880&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=3184&qos.port=22222&side=provider×tamp=1544600701940
经过转换,变成injvm://127.0.0.1/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=192.168.27.211&bind.port=20880&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=3184&qos.port=22222&side=provider×tamp=1544600701940
,根据前面提到到 SPI 拓展机制,Dubbo 首先在使用InjvmProtocol
协议暴露一个本地服务。其主要的作用是根据服务实现对象(ref)创建一个本地的 Invoker 对象,并且将其保存到exporterMap
备用。
接下来就是服务注册和对外发布的部分了,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
// For providers, this is used to enable custom proxy to generate invoker
String proxy = url.getParameter(Constants.PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
}
// registry add export key
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
首先 registryURL 为 registry://224.5.6.7:1234/org.apache.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&pid=3184&qos.port=22222®istry=multicast×tamp=1544600649422
,最重要的代码是registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())
将要暴露的服务URL,作为registryURL的一个参数,key值为 export,结果为:registry://224.5.6.7:1234/org.apache.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo://192.168.27.211:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=192.168.27.211&bind.port=20880&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=3184&qos.port=22222&side=provider×tamp=1544600701940&pid=3184&qos.port=22222®istry=multicast×tamp=1544600649422
同样,根据之前的 SPI 拓展原理,registry 对应的 Protocol 的实现类为 RegistryProtocol
,实际上所有的 Protocol实现类,还有两个 Wrapper 类,分别为 ProtocolListenerWrapper
和 ProtocolFilterWrapper
。不过这部分和本章节的内容关系不大,暂时略过,实际的暴露服务和向服务中心发布服务,都是在RegistryProtocol
中完成。
先看暴露服务,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return exporter;
}
这段代码中,最主要是从originInvoker
中分离出要暴露的服务 URL 也就是前面提到的 dubbo://192.168.27.211:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=192.168.27.211&bind.port=20880&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=3184&qos.port=22222&side=provider×tamp=1544600701940
,接着还是通过 SPI 拓展,找到对应的实现类 DubboProtocol,进行服务的暴露。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
openServer(url);
optimizeSerialization(url);
return exporter;
}
首先将包含有服务实现类的 Invoker 封装成为 Exporter 并保持到 exporterMap
中,这一步很重要,后续在接收到客户端请求的时候,会根据 serviceKey 从 exporterMap
中获取 Invoker 执行,这一步我们留着后面再描述。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
}
}
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
每一种协议,只需要暴露一个服务就足够。
1
2
3
4
5
6
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
接下来重点就是要启动本地的服务监听程序。继续跟openServer(url)
的代码,启动服务监听器的工作在 server = Exchangers.bind(url, requestHandler)
,从名字就可以看出,requestHandler
的作用,是用于处理客户端发起的请求。requestHandler
是 DubboProtocol 的内部类,是 ExchangeHandler
的实现类,主要的方法是reply
,从exporterMap
中获取 Invoker
并获取执行结果。
RPC 通过网络调用,Server 端需要接收 Client 端发过来的请求,并且找到处理类,并将结果返回。这一个交互的活动在 Dubbo 中被抽象为 Exchange 信息交换层,用来处理 RPC 调用中的请求-响应模式,其下是 Transporter 层,后者负责网络数据的传输。
Exchange 和 Transporter 的相互关系和功能我们在通信细节来展开。在这里,Exchangers 只是一个 Facade 类,完成客户端服务器启动的是 HeaderExchanger
。从 Dubbo 的工程结构上来讲,这一步开始都在 dubbo-remoting 层。在服务暴露环节,我们的目的就是创建一个Server 端。
1
2
3
4
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
HeaderExchanger
将前面介绍的 requestHandler 封装到 HeaderExchangeHandler
和 DecodeHandler
中,其中HeaderExchangeHandler
是 Exchange 层模拟 Request-Response 的关键。同样,我们在通信细节来展开。
通过一些列的 SPI 拓展,最终找到 NettyTransporter
,我们想要最终要创建的 Netty 服务
1
2
3
4
@Override
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
从 NettyServer 的继承结构中可以看出,它既是Server,用来启动一个 Netty 服务端,又是一个 Handler 用来处理网络请求。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("handler", nettyServerHandler);
}
});
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
最后启动一个 NettyServer,其中,NettyServerHandler
是 Netty 通信 API 和 Dubbo 通信 API 的桥梁,NettyCodecAdapter
负责编码和解码等操作。
到这里,服务端的监听程序就启动完毕,当接收到客户端请求后,进入NettyServerHandler
处理,这些是通信细节方面的内容。
接着看 Dubbo 如何将服务注册到注册中心中,这需要回到 RegistryProtocol
中。
1
2
3
4
5
6
7
8
9
10
final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);
//to judge to delay publish whether or not
boolean register = registeredProviderUrl.getParameter("register", true);
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
if (register) {
//XXX + why didn't use registry directly.
register(registryUrl, registeredProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
首先从 invoker
中获取要注册到注册中心的 URL,本例中为 dubbo://192.168.27.211:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=6544&side=provider×tamp=1544692085824
1
2
3
4
public void register(URL registryUrl, URL registedProviderUrl) {
Registry registry = registryFactory.getRegistry(registryUrl);
registry.register(registedProviderUrl);
}
接着又是老套路,通过SPI拓展,根据 URL,找到对应的注册中心,这里我们使用 Zookeeper,那么对应的就是 ZookeeperRegistry
1
2
3
4
5
6
7
protected void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
最终通过zkClient 客户端,将URL转换成 path,写入到 Zookeeper 中。
服务引用和发现
服务的暴露和发布说的是如何启动服务端,客户端的启用则对应服务的引用和发现。
Dubbo 服务的引用对应的 API 为 ReferenceConfig
。在前面讲 RPC 的文章中,我们有说过,远程调用的常见套路就是需要再客户端生成一个 Proxy
,由这个 Proxy
来完成网络交互、请求和结果的 encode 和 decode。在 Dubbo 中同样是这样,ReferenceConfig
的核心方法为init
。
首先收集各种配置信息,存储到一个 map 对象中,这个对象后续会作为 URL 的 parameters,进入createProxy(Map<String, String> map)
方法,我们需要创建一个可以进行远程调用的Invoker
Invoker
,是 Dubbo RPC 调用的一个核心 API ,代表一个可执行的对象(本地、远程、集群),可以根据方法和参数,获取执行的结果,最典型的应用就是使用Invoker
来发起一个远程调用Invocation
,封装了需要执行的方法,参数等信息。目前实现类只有RpcInvocation
Invoker
的核心方法是Result invoke(Invocation invocation)
首先是处理 JVM 内部调用,前面我说了injvm
的模式会在本地创建一个服务实例,如果是使用这个模式,那么对应的Invoker
只要从exporterMap
取出之前写入的Exporter
执行即可。
1
2
3
4
5
URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
invoker = refprotocol.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using in-jvm(local) service " + interfaceClass.getName());
}
refprotocol
是 ReferenceConfig
的类变量,一个SPI Adaptive 拓展变量,injvm
模式比较简单,不做太多的描述。
第二步,要实现远程服务的调用,首先要获取远程服务的地址,在前面的概览中我们有提及,在 RPC 中,这个步骤一般通过 Registry 来协作完成,在Dubbo中,我们在服务发布阶段,已经将服务提供者的信息发布到了 Zookeeper。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (url.getPath() == null || url.getPath().length() == 0) {
url = url.setPath(interfaceName);
}
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
// registryProtocol ?
urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
} else {
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else { // assemble URL from register center's configuration
List<URL> us = loadRegistries(false);
if (us != null && !us.isEmpty()) {
for (URL u : us) {
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls.isEmpty()) {
throw new IllegalStateException("No such any registry to reference "
+ interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}
首先ReferenceConfig
可以直接配置目标 URL,绕过Registry,实现点对点的通信,如果 url 使用registry
作为协议,则认为是使用独立的注册中心。如果没有额外配置,一般是通过loadRegistries(false)
来获取注册中心的地址,在本例中是这个 URL 是registry://224.5.6.7:1234/org.apache.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.2&pid=940&qos.port=33333®istry=zookeeper×tamp=1544756017606
,接着将之前保存有各种配置信息的 map 作为parameters 添加到 URL 中,为registry://224.5.6.7:1234/org.apache.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.2&pid=9088&qos.port=33333&refer=application%3Ddemo-consumer%26check%3Dfalse%26dubbo%3D2.0.2%26interface%3Dorg.apache.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D9088%26qos.port%3D33333%26register.ip%3D192.168.54.53%26side%3Dconsumer%26timestamp%3D1544756253395®istry=zookeeper×tamp=1544756263186
接下来就是根据 URL 生成 Invoker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
if (urls.size() == 1) {
// only one remote provider
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
// cluster provider
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // use last registry url
}
}
if (registryURL != null) { // registry url is available
// use AvailableCluster only when register's cluster is available
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // not a registry url
invoker = cluster.join(new StaticDirectory(invokers));
}
}
如果 URL 只有一个,则进入 refprotocol.refer(interfaceClass, urls.get(0));
,如果 URL 有多个,则分别调用,根据 SPI 拓展,定位到 RegistryProtocol
的 refer
方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
// create registry
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
// if service is RegistryService return directly
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*"
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
String group = qs.get(Constants.GROUP_KEY);
if (group != null && group.length() > 0) {
if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
|| "*".equals(group)) {
return doRefer(getMergeableCluster(), registry, type, url);
}
}
// configure cluster strategy
return doRefer(cluster, registry, type, url);
}
首先根据 URL 创建 Registry
,这里对应是 ZookeeperRegistry
,其中和服务发现最相关的方法是void subscribe(URL url, NotifyListener listener);
,参数 url 表示订阅的条件,listener 是订阅的节点发生变化时候的监听器,目的是及时刷新对应的 Invoker
。接着进入 doRefer
,一般情况下doXXX
才是完成具体的逻辑的地方。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// directory : aka a dynamic invoker include a list of invoker
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
// consumer register
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
}
// consumer subscribe
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
// create invoker
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
首先创建RegistryDirectory
,Directory
代表多个Invoker
,可以把它看成List<Invoker>
,但与List不同的是,它的值可能是动态变化的,比如注册中心推送变更。然后向注册中心注册 consumer 节点,这样我们可以通过注册中心了解消费端的情况。再通过Directory
订阅服务,此时URL 经过加工变成 consumer://192.168.27.211/org.apache.dubbo.demo.DemoService?application=demo-consumer&category=providers,configurators,routers&check=false&dubbo=2.0.2&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=5940&qos.port=33333&side=consumer×tamp=1544770768785
,调研directory.subscribe
1
2
3
4
public void subscribe(URL url) {
setConsumerUrl(url);
registry.subscribe(url, this);
}
这里需要注意,在使用 registry 创建订阅的时候,监听器就是 RegistryDirectory
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
List<URL> urls = new ArrayList<URL>();
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
@Override
public void childChanged(String parentPath, List<String> currentChilds) {
// notify
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
}
});
zkListener = listeners.get(listener);
}
zkClient.create(path, false);
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
// notify registryDirectory. synchronized
notify(url, listener, urls);
使用,ZookeeperRegistry
,注册的代码是在对应的 Zookeeper Node 中添加监听器,当 Node 的值发生变化的时候,触发对应的监听器事件。在本例中,也就是 RegistryDirectory
的 notify
方法。需要注意,在ZookeeperRegistry#doSubscribe
方法的最后,需要直接调用notify
方法,同步触发Invoker
的初始化,之后的变化才是异步更新。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public synchronized void notify(List<URL> urls) {
List<URL> invokerUrls = new ArrayList<URL>();
List<URL> routerUrls = new ArrayList<URL>();
List<URL> configuratorUrls = new ArrayList<URL>();
for (URL url : urls) {
String protocol = url.getProtocol();
String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
if (Constants.ROUTERS_CATEGORY.equals(category)
|| Constants.ROUTE_PROTOCOL.equals(protocol)) {
routerUrls.add(url);
} else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
|| Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
configuratorUrls.add(url);
} else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
invokerUrls.add(url);
} else {
logger.warn("Unsupported category " + category + " in notified url: " + url
+ " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
}
}
// configurators
if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
this.configurators = toConfigurators(configuratorUrls);
}
// routers
if (routerUrls != null && !routerUrls.isEmpty()) {
List<Router> routers = toRouters(routerUrls);
if (routers != null) { // null - do nothing
setRouters(routers);
}
}
List<Configurator> localConfigurators = this.configurators; // local reference
// merge override parameters
this.overrideDirectoryUrl = directoryUrl;
if (localConfigurators != null && !localConfigurators.isEmpty()) {
for (Configurator configurator : localConfigurators) {
this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
}
}
// providers
refreshInvoker(invokerUrls);
}
按照 URL 的分类不同,分别更新路由规则 routes
、配置configureators
还有Invoker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
private void refreshInvoker(List<URL> invokerUrls) {
if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
&& Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
// Forbid to access
this.forbidden = true;
// Set the method invoker map to null
this.methodInvokerMap = null;
// Close all invokers
destroyAllInvokers();
} else {
// Allow to access
this.forbidden = false;
// local reference
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap;
if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
invokerUrls.addAll(this.cachedInvokerUrls);
} else {
this.cachedInvokerUrls = new HashSet<URL>();
//Cached invoker urls, convenient for comparison
this.cachedInvokerUrls.addAll(invokerUrls);
}
if (invokerUrls.isEmpty()) {
return;
}
// Translate url list to Invoker map
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);
// Change method name to map Invoker Map
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap);
// state change
// If the calculation is wrong, it is not processed.
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" +
invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
return;
}
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
// copy new urlInvokerMap to local reference
this.urlInvokerMap = newUrlInvokerMap;
try {
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}
刷新Invoker
,首先将 URL 转换成为对应的 Invoker
,这个步骤最重要的就是创建远程调用的Invoker
,对应代码 invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
很显然,这个Protocol
也是使用 SPI 进行拓展,对应的实现类是DubboProtocol
,这边稍后再补充,先抓紧订阅服务的主线。
接着替换Invoker
引用urlInvokerMap
。这样当 Zookeeper 中的节点发生变化的是,RegistryDirectory
中的Invoker
map 也可以保持最新,这一个过程都是异步完成的。
回到RegistryProtocol
中,directory.subscribe
只是保证各项配置的及时更新,我们知道 Dubbo 可能会同时有多个服务的提供者提供服务,这个时候就需要使用路由策略(负载均衡或者服务聚合)来选择具体要调用哪一个服务提供者。这一步在Invoker invoker = cluster.join(directory);
中实现,Cluster
中,集群策略会将 Directory
伪装成一个Invoker
返回,不同的 Cluster
算法就对应不同Invoker
1
2
3
4
5
6
7
public class FailoverCluster implements Cluster {
public final static String NAME = "failover";
@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new FailoverClusterInvoker<T>(directory);
}
}
这里选择带有重试功能的FailoverClusterInvoker
,它会在调用一个Invoker
失败之后,再循环调用若干次,直到成功。
1
2
3
4
5
6
7
8
9
10
11
12
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
// binding attachments into invocation.
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addAttachments(contextAttachments);
}
List<Invoker<T>> invokers = list(invocation);
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
return doInvoke(invocation, invokers, loadbalance);
}
AbstractClusterInvoker
定义基本的调用逻辑,找到invokers
、选择负载均衡算法,调用具体的实现doInvoke
,具体的逻辑比较简单,这边就不补充代码了。
最后我们再补充DubboProtocol
如何创建一个Invoker
1
2
3
4
5
6
7
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
Dubbo
的服务调用方中,最重要的就是要获取连接远程服务端的客户端,其方法就是getClients(url)
,到这里,我们可以发现 URL 在Dubbo中的重要地位,几乎所有参数都通过 URL 来传输,这也是利用它半规范化的设计,一部分固定的参数,加上一部分灵活的 parameter参数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private ExchangeClient[] getClients(URL url) {
// whether to share connection
boolean service_share_connect = false;
// client concurrent control
int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
// if not configured, connection is shared, otherwise, one connection for one service
if (connections == 0) {
service_share_connect = true;
connections = 1;
}
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (service_share_connect) {
clients[i] = getSharedClient(url);
} else {
clients[i] = initClient(url);
}
}
return clients;
}
获取客户端有两种方式,一种是共享,一种是重新初始化,我们这里主要看看重新初始化。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private ExchangeClient initClient(URL url) {
// client type setting.
String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
// enable heartbeat by default
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
// BIO is not allowed since it has severe performance issue.
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " + str + "," +
" supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
}
ExchangeClient client;
try {
// connection should be lazy
if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}
return client;
}
Exchangers.connect(url, requestHandler)
的调用过程和创建前面创建服务端的NettyServer
非常类似,最后创建基于 Netty
的NettyClient
,这边不再赘述。
1
2
3
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
NettyClient
创建了和远端服务的通信Channel
,获取NettyClient
之后,一样会被重重包装成为HeaderExchangeClient
,后者最终作为构造参数,创建DubboInvoker
。 Invoker
就可以进行远程方法调用,在客户端,最后完成的一个步骤,则是使用这个Invoker
,创建对应服务接口的代理类proxyFactory.getProxy(invoker);
,这样客户端就可以无感知地使用代理类来进行 RPC 调用了。用来进行后续的通信。
总结
Dubbo的服务暴露和引用,整个过程逻辑比较简单,如果从底层开始向上看,先是从创建 Client 和 Server,这样就拥有了处理网络通信的能力。再上一层,通过层层封装,增强框架的功能。并且和其他的关联组件比如 Registry 等建立桥接。而这些功能都是建立在 SPI 拓展的基础上。
通信过程
从实现的细节上Dubbo 要比之前的 Spring Remoting 等要复杂很多,但是功能上却是一致的。比较复杂的部分在于,Dubbo 需要处理异步的消息调用,而我们之前采用的 Hessian+Spring Remoting 这是基于 HTTP 的同步通信。也正是如此,Dubbo 才需要引入独立的 Exchange
层来处理同步调用到异步消息的转换。下面我们就来看看,Dubbo 具体如何处理通信层的逻辑。
Exchange 也称为信息交换层,主要的作用是封装 请求 - 响应 模式,将同步调用转换为异步调用,以
Request
、Response
为中心,扩展接口为Exchanger
,ExchangeChannel
,ExchangeClient
,ExchangeServer
。
客户端逻辑
前面我梳理客户端创建远程代理的整个过程。现在客户端有了一个动态代理。当调用 Proxy 方法的时候,方法会被InvokerInvocationHandler
拦截
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
RpcInvocation invocation;
if (RpcUtils.hasGeneratedFuture(method)) {
Class<?> clazz = method.getDeclaringClass();
String syncMethodName = methodName.substring(0, methodName.length() - Constants.ASYNC_SUFFIX.length());
Method syncMethod = clazz.getMethod(syncMethodName, method.getParameterTypes());
invocation = new RpcInvocation(syncMethod, args);
invocation.setAttachment(Constants.FUTURE_GENERATED_KEY, "true");
invocation.setAttachment(Constants.ASYNC_KEY, "true");
} else {
invocation = new RpcInvocation(method, args);
if (RpcUtils.hasFutureReturnType(method)) {
invocation.setAttachment(Constants.FUTURE_RETURNTYPE_KEY, "true");
invocation.setAttachment(Constants.ASYNC_KEY, "true");
}
}
return invoker.invoke(invocation).recreate();
将调用方法和参数封装成为RpcInvocation
,最后调用invoker.invoke
,我们知道当前的Invoker
实现类是FailoverClusterInvoker
1
2
3
4
5
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
Result result = invoker.invoke(invocation);
return result;
首先调用select
选择一个Invoker
,通过前面的分析可知,这个实现类就是DubboInvoker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isAsyncFuture = RpcUtils.isGeneratedFuture(inv) || RpcUtils.isFutureReturnType(inv);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
// deal with async call
ResponseFuture future = currentClient.request(inv, timeout);
// For compatibility
FutureAdapter<Object> futureAdapter = new FutureAdapter<>(future);
RpcContext.getContext().setFuture(futureAdapter);
Result result;
if (isAsyncFuture) {
// register resultCallback, sometimes we need the async result being processed by the filter chain.
result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
} else {
result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
}
return result;
} else {
RpcContext.getContext().setFuture(null);
// sync call current thread will block here until future.isDone or timeout exception throw.
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
}
}
DubboInvoker
中,首先是拿到ExchangeClient
,它是在前面我们使用new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true)
创建的,包含了NettyClient
和客户端调用在Exchange
层的核心逻辑。
接下来就要使用ExchangeClient
开始远程调用了,调用分成三种方式
- 单向通信,调用
ExchangeClient#send
,直接返回new RpcResult()
- 异步双向通信,调用
ExchangeClient#request
,会返回一个异步的DefaultFuture
,接着注册适配器FutureAdapter
,并为调用方返回一个AsyncRpcResult
,它类似 Java JUC 线程池中的Future
,可以通过getValue
来堵塞获取返回值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public FutureAdapter(ResponseFuture future) {
this.future = future;
this.resultFuture = new CompletableFuture<>();
future.setCallback(new ResponseCallback() {
@Override
public void done(Object response) {
Result result = (Result) response;
FutureAdapter.this.resultFuture.complete(result);
V value = null;
try {
value = (V) result.recreate();
} catch (Throwable t) {
FutureAdapter.this.completeExceptionally(t);
}
FutureAdapter.this.complete(value);
}
@Override
public void caught(Throwable exception) {
FutureAdapter.this.completeExceptionally(exception);
}
});
}
可以看到FutureAdapter
继承了J.U.C的CompletableFuture
,并且在DefaultFuture
中注册了回调函数,这样当DefaultFuture
获取异步返回值完成调用的时候,会回调done
方法。同时这也会触发AsyncRpcResult#getValue
可以返回响应值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public AsyncRpcResult(CompletableFuture<Object> future, final CompletableFuture<Result> rFuture, boolean registerCallback) {
if (rFuture == null) {
throw new IllegalArgumentException();
}
resultFuture = rFuture;
if (registerCallback) {
future.whenComplete((v, t) -> {
RpcResult rpcResult;
if (t != null) {
if (t instanceof CompletionException) {
rpcResult = new RpcResult(t.getCause());
} else {
rpcResult = new RpcResult(t);
}
} else {
rpcResult = new RpcResult(v);
}
rFuture.complete(rpcResult);
});
}
this.valueFuture = future;
this.storedContext = RpcContext.getContext().copyOf();
this.storedServerContext = RpcContext.getServerContext().copyOf();
}
public Result getRpcResult() {
Result result;
try {
// return when resultFuture.complete
result = resultFuture.get();
} catch (Exception e) {
// This should never happen;
logger.error("", e);
result = new RpcResult();
}
return result;
}
- 同步双向通信:直接返回
DefaultFuture#get
,则会堵塞调用线程,直到DefaultFuture
获取响应值。
接下来我们分析同步双向通信的方式,currentClient.request(inv, timeout).get()
,转换成异步的方式上面已经由描述,实际上就是增加回调函数,避免堵塞调用线程。
同步、异步、堵塞、非堵塞是非常容易混淆的概念,这边贴一下我认为比较准确的描述:同步与异步,是指对于请求的发起者,是否需要等到请求的结果(同步),还是说请求完毕的时候以某种方式通知请求发起者(异步)。在这个语义环境下,阻塞与非阻塞,是指请求的受理者在处理某个请求的状态,如果在处理这个请求的时候不能做其它事情(请求处理时间不确定),那么称之为阻塞,否则为非阻塞。
在初始化HeaderExchangeClient
的时候,我们使用了NettyClient
作为参数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public HeaderExchangeClient(Client client, boolean needHeartbeat) {
if (client == null) {
throw new IllegalArgumentException("client == null");
}
this.client = client;
this.channel = new HeaderExchangeChannel(client);
String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null &&
dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
if (heartbeatTimeout < heartbeat * 2) {
throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
}
if (needHeartbeat) {
long tickDuration = calculateLeastDuration(heartbeat);
heartbeatTimer = new HashedWheelTimer(tickDuration, TimeUnit.MILLISECONDS, Constants.TICKS_PER_WHEEL);
startHeartbeatTimer();
}
}
通信的核心在于HeaderExchangeChannel
信道,前面提到的currentClient.request
方法,就是通过它来发送请求
1
2
3
public ResponseFuture request(Object request, int timeout) throws RemotingException {
return channel.request(request, timeout);
}
找到对应HeaderExchangeChannel
中的实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException("");
}
// create request.
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
// return future.
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
在这里,首先创建Request
,还记得前面我提到的,Exchange
层是围绕着 Request
和 Response
来构建的,这里就是实现的入口,将前面的Invocation
转换为Request#data
,通过 channel
发送,这个channel
就是之前包装好的NettyClient
,查看前面的继承结构图,NettyClient
也是一个org.apache.dubbo.remoting.Channel
实现。并且创建一个 DefaultFuture
用于后续异步获取 Response
。
1
2
3
4
5
6
7
8
9
private DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
this.request = request;
this.id = request.getId();
this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// put into waiting map.
FUTURES.put(id, this);
CHANNELS.put(id, channel);
}
可以看到DefaultFuture
使用全局变量FUTURES
保存 DefaultFuture
实例,并且使用requestId
作为 key,这样后续就可以通过这个全局变量向对应的DefaultFuture
设置响应值,并触发完成动作,要获取DefaultFuture
的响应值,可以通过get
方法来堵塞调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
if (!isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
while (!isDone()) {
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
if (!isDone()) {
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
return returnFromResponse();
}
实现细节,通过对done
和响应值状态进行控制,使用细节可以参考 JUC 中的 Future
。
发送Request
就使用NettyClient
来实现,发送代码在其抽象类AbstractClient
中。
1
2
3
4
5
6
7
8
9
10
11
public void send(Object message, boolean sent) throws RemotingException {
if (send_reconnect && !isConnected()) {
connect();
}
Channel channel = getChannel();
//TODO Can the value returned by getChannel() be null? need improvement.
if (channel == null || !channel.isConnected()) {
throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
}
channel.send(message, sent);
}
connect
方法要求Client
实现要创建和远端的连接,getChannel
获取通信的信道,这两个方法,不同的实现类会有差异,我们这里只使用Netty作为例子,connect
使用 Netty 的Bootstrap
创建和远程服务端的连接,并且获取io.netty.channel.Channel
,注意这个是 Netty 的通信信道,而getChannel
,这是使用NettyChannel.getOrAddChannel(c, getUrl(), this)
,将io.netty.channel.Channel
,封装转换为NettyChannel
,NettyChannel
是 Dubbo 的 org.apache.dubbo.remoting.Channel
和 Netty 的Channel
之间的桥梁,将 Dubbo Channel
的 API 桥接到 Netty Channel
上实现。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void send(Object message, boolean sent) throws RemotingException {
super.send(message, sent);
boolean success = true;
int timeout = 0;
try {
ChannelFuture future = channel.writeAndFlush(message);
if (sent) {
timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
success = future.await(timeout);
}
Throwable cause = future.cause();
if (cause != null) {
throw cause;
}
} catch (Throwable e) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
}
if (!success) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
+ "in timeout(" + timeout + "ms) limit");
}
}
NettyChannel
使用ChannelFuture future = channel.writeAndFlush(message)
完成了消息的发送。因为我们使用NettyClientHandler
作为 Netty 客户端的处理类,消息发送之后,还需要经过NettyClientHandler
,但是这块的逻辑在消息端并没有太多内容,最终会调用HeaderExchangeHandler#sent
方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void sent(Channel channel, Object message) throws RemotingException {
Throwable exception = null;
try {
channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
handler.sent(exchangeChannel, message);
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
} catch (Throwable t) {
exception = t;
}
if (message instanceof Request) {
Request request = (Request) message;
DefaultFuture.sent(channel, request);
}
if (exception != null) {
if (exception instanceof RuntimeException) {
throw (RuntimeException) exception;
} else if (exception instanceof RemotingException) {
throw (RemotingException) exception;
} else {
throw new RemotingException(channel.getLocalAddress(), channel.getRemoteAddress(),
exception.getMessage(), exception);
}
}
}
主要是进行一些异常的处理,调用DubboProtocol#requestHandler#sent
方法,以及DefaultFuture.sent(channel, request)
记录发送状态和发送的时间。
到这一步,我们已经将消息,通过Channel
发送给了服务端。服务端的逻辑我们放在后面将,客户端要怎么处理服务端返回的响应呢,又如何将响应值返回给调用方?让我们接着往下看。
我们在创建NettyClient
的时候,会将DubboProtocol
中的requestHandler
作为参数来构建NettyClient
1
2
3
4
5
6
7
8
9
10
11
ExchangeClient client;
try {
// connection should be lazy
if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}
然后又包装了两层,DecodeHandler
用于解码,HeaderExchangeHandler
用于模拟 Request
- Response
请求模式,也就是关联请求和响应。
1
2
3
4
@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
接着会继续包装多个Handler
用来拓展功能
1
2
3
4
5
6
7
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
super(url, wrapChannelHandler(url, handler));
}
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}
wrapChannelHandler
的代码比较简单,分别包装MultiMessageHandler
用来处理同时返回多个消息的情况。使用 SPI 拓展,获取消息分发器,默认的是 AllDispatcher
,对应的会封装AllChannelHandler
,后者会启用线程池,来处理接收到的消息的处理,方式是received
1
2
3
4
5
6
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {}
}
创建一个 ChannelEventRunnable
提交到线程池进行处理,这个过程是异步的,最终请求会层层调用到HeaderExchagneHandler#received
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {
handleRequest(exchangeChannel, request);
} else {
handler.received(exchangeChannel, request.getData());// DubboProtocol#requestHandler
}
}
} else if (message instanceof Response) {
// client side received response handle response
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
这边的逻辑比较多
- 响应是一个
Request
类型,这部分逻辑,是作为服务端,被请求时的时候调用的执行的,后面我们会讲到。根据是否想要返回Response
,进行相应的处理,逻辑都比较简单。需要返回值的会使用DubboProtocol#requestHandler#reply
来构建返回值,并发送返回;否则直接调用DubboProtocol#requestHandler#received
做对应的处理。注意这里不是接收心跳,心跳在HeartbeatHandler
中已经处理,不会进入HeaderExchagneHandler
- 响应是一个
Response
类型,这个是主要的处理类型,使用handleResponse(channel, (Response) message);
来处理,
1
2
3
4
5
static void handleResponse(Channel channel, Response response) throws RemotingException {
if (response != null && !response.isHeartbeat()) {
DefaultFuture.received(channel, response);
}
}
逻辑十分简单,调用DefaultFuture#received
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public static void received(Channel channel, Response response) {
try {
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response " + response
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()));
}
} finally {
CHANNELS.remove(response.getId());
}
}
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
// wake up.
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
invokeCallback(callback);
}
}
首先从FUTURES
中根据responseId
获取对应的对应的DefaultFuture
,设置返回值,done.signal()
唤醒正在等待的DefaultFuture#get
方法。
这样我们就构建了 Dubbo 端的Handler
,等待接受服务端返回响应值了,但是还需要和 Netty 客户端创建桥接,因为只有 Netty 客户端才能够接收到 Netty 服务端的返回。这中间的桥梁就是 NettyClientHandler
,在创建 NettyClient
的时候会同时创建 Netty 客户端,并且使用NettyClientHandler
作为处理器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
protected void doOpen() throws Throwable {
final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
bootstrap = new Bootstrap();
bootstrap.group(nioEventLoopGroup)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
//.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
.channel(NioSocketChannel.class);
if (getConnectTimeout() < 3000) {
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
} else {
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout());
}
bootstrap.handler(new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) throws Exception {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("handler", nettyClientHandler);
}
});
}
接下来就是 Netty 完成的工作了,前面提到,当发送Request
的时候,会调用NettyClientHandler#write
,当有响应返回的时候,调研的则是channelRead
方法
1
2
3
4
5
6
7
8
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
handler.received(channel, msg);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
只是很简单的桥接模式,将整个逻辑串联到之前创建的 Dubbo Handler
处理模块中。到这边客户端的逻辑已经全部讲完了。核心只要记住下面几个点
DubboInvoker
作为动态代理的实现,将本地调用封装为远程调用的Invocation
- 使用
ExchangeClient
发起远程调用,ExchangeClient
使用的Channel
也就是NettyClient
发送请求,同时在DefaultFuture
中保留一个Future
,调用方用它来异步获取返回值 NettyClient
使用NettyClientHandler
作为请求和响应的处理器,当响应值返回的时候,会调用DefaultFuture#received
,这样调用方即可获得返回值
服务端逻辑处理
和客户端类似,NettyServer
也使用Handler
处理器来衔接 Netty 框架和 Dubbo 框架。在服务暴露章节中提到的DubboProtocol#requestHandler
作为这个Handler
包裹的最底层处理器。
1
2
3
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
类似的,ChannelHandlers#wrap
也会为其封装上MultipleMessageHandler
、HeartbeatHandler
、AllChannelHandler
等一系列的处理器,这个在客户端通信细节已经讲过了。实际上服务端的通信和客户端的通信,代码几乎是一样的,只是处理的消息类型有区别。
在NettyServer
中,ChannelHandler
被封装为NettyServerHandler
,定位与NettyClientHandler
类似,当接受到Request
的时候,会调用channelRead
方法
1
2
3
4
5
6
7
8
9
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
handler.received(channel, msg);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
逻辑与前面的NettyClient
完全一致,不同的是消息的类型,这边是Request
,最终也会调用到HeaderExchangeHandler#received
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Override
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {
handleRequest(exchangeChannel, request);
} else {
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
// client side received response handle response
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
这个时候执行的就是Request
类型消息的处理逻辑,前面我们有稍微提及到。我们看比较复杂的handleRequest
方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion());
if (req.isBroken()) {
Object data = req.getData();
String msg;
if (data == null) {
msg = null;
} else if (data instanceof Throwable) {
msg = StringUtils.toString((Throwable) data);
} else {
msg = data.toString();
}
res.setErrorMessage("Fail to decode request due to: " + msg);
res.setStatus(Response.BAD_REQUEST);
channel.send(res);
return;
}
// find handler by message class.
Object msg = req.getData();
try {
// handle data.
CompletableFuture<Object> future = handler.reply(channel, msg);
if (future.isDone()) {
res.setStatus(Response.OK);
res.setResult(future.get());
channel.send(res);
return;
}
future.whenComplete((result, t) -> {
try {
if (t == null) {
res.setStatus(Response.OK);
res.setResult(result);
} else {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
channel.send(res);
} catch (RemotingException e) {
logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
} finally {
// HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
});
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
channel.send(res);
}
}
首先,如果请求不完整,无法解码,返回错误代码,如果请求正常,调用handler.reply(channel, msg);
构建返回值,这里就是我们前面一直说的DubboProtocol#requestHandler
,在这里终于起到了真正的作用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
// need to consider backward-compatibility if it's a callback
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || !methodsStr.contains(",")) {
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
+ " not found in callback service interface ,invoke will be ignored."
+ " please update the api interface. url is:"
+ invoker.getUrl()) + " ,invocation is :" + inv);
return null;
}
}
// set RpcContext
RpcContext rpcContext = RpcContext.getContext();
boolean supportServerAsync = invoker.getUrl().getMethodParameter(inv.getMethodName(), Constants.ASYNC_KEY, false);
if (supportServerAsync) {
CompletableFuture<Object> future = new CompletableFuture<>();
rpcContext.setAsyncContext(new AsyncContextImpl(future));
}
rpcContext.setRemoteAddress(channel.getRemoteAddress());
Result result = invoker.invoke(inv);
if (result instanceof AsyncRpcResult) {
return ((AsyncRpcResult) result).getResultFuture().thenApply(r -> (Object) r);
} else {
return CompletableFuture.completedFuture(result);
}
}
throw new RemotingException(channel, "Unsupported request: "
+ (message == null ? null : (message.getClass().getName() + ": " + message))
+ ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
请求也是一个Invocation
,转换后最重要的就是获取要执行的服务实现类了,在服务暴露的章节,我们将其封装成DubboExporter
并且保存在exporterMap
中,getInvoker
用来获取对应的执行Invoker
,这边补充一下使用 SPI 获取的DubboProtocol
在整个服务端中是一个单例,所以暴露和获取对应的是同一个DubboProtocol
实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
boolean isCallBackServiceInvoke = false;
boolean isStubServiceInvoke = false;
int port = channel.getLocalAddress().getPort();
String path = inv.getAttachments().get(Constants.PATH_KEY);
// if it's callback service on client side
isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getAttachments().get(Constants.STUB_EVENT_KEY));
if (isStubServiceInvoke) {
port = channel.getRemoteAddress().getPort();
}
//callback
isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke;
if (isCallBackServiceInvoke) {
path = inv.getAttachments().get(Constants.PATH_KEY) + "." + inv.getAttachments().get(Constants.CALLBACK_SERVICE_KEY);
inv.getAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString());
}
String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));
// key point find the service refer's exporter.
DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
if (exporter == null) {
throw new RemotingException();
}
return exporter.getInvoker();
}
逻辑很简单,从exporterMap
中,根据serviceKey
取出Exporter
以及包装在其内的执行器Invoker
返回。Result result = invoker.invoke(inv);
,使用 invoker 执行,并且得到一个CompletableFuture
结果。
回到handleRequest
方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
if (future.isDone()) {
res.setStatus(Response.OK);
res.setResult(future.get());
channel.send(res);
return;
}
future.whenComplete((result, t) -> {
try {
if (t == null) {
res.setStatus(Response.OK);
res.setResult(result);
} else {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
channel.send(res);
} catch (RemotingException e) {
logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
} finally {
// HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
});
当CompletableFuture
完成之后,将结果通过channel
发送回客户端,这样完成整个服务端请求的调用。
总结
要理解 Dubbo 在通信层的细节处理,理解Exchange
层,非常重要。使用 Netty 实现的 Transporter
层主要的逻辑是通信,也就是发送消息和接受消息,而不关心之间的对应关系。而 RPC 框架的使用者,希望能够像本地调用一样使用远程服务,因此一次请求对应一次响应的模式才是最适合的。Exchange
就在中间层,通过DefaultFuture
以及抽象的Request
和Response
完成这种转换。
消息的发送层和接受层,都有各种封装的Handler
各自完成各自的工作,同时使用大量的代理模式,构建请求和响应的处理过程;通信层,也是使用Channel
作为核心,NettyChannel
实现了 Netty 和 Dubbo 之间的代理转换。在设计上应该是参考了 Netty 中Handler
和Channel
的设计。
微博有开源了一个 RPC,Motan,从代码结构上,可以看成是 Dubbo 的精简,去掉了Exchange
层,而是将Request-Response通信的细节,简化后放到NettyClient
和NettyServer
中来实现。可以和 Dubbo 做对比参考。
服务治理
服务治理,是伴随着 SOA 产生的概念。当一个系统被拆分成为不同的服务,或者一个系统使用不同的服务的时候,都会遇到下面几个问题,这边直接引用 IBM 提出的 SOA 治理定义。
- 服务的定义(服务的范围、接口和边界)
- 服务部署生命周期(各个生命周期阶段)
- 服务版本治理(包括兼容性)
- 服务迁移(启用和停用)
- 服务注册中心(依赖关系)
- 服务消息模型(规范数据模型)
- 服务监控(进行问题定位)
- 服务所有权(明确组织结构责任)
- 服务测试(重复测试)
- 服务安全(包括可接受的保护范围)
可以知道,服务治理的概念实际上非常广泛,而且更加偏重在如何制定一个服务策略,SOA 服务的拥有者和开发者经常设计到不同的部门或者是开发团队。服务治理需要保证有效地协调,使得服务能够真正得到共享和重用,这些是服务治理要关心的问题,而且往往不仅是技术问题。
Dubbo 中关于服务治理相关的技术实现,可以参考服务治理过程演进
上面是整个 Dubbo 服务治理体系中的主要的参与方,可以看到大部分的工作,需要再前端,也就是客户端来完成,从大致的原理上,Dubbo 和服务调用相关的配置信息,都是通过 URL 来传递的,并且有下面的优先级关系
- 方法级优先,接口级次之,全局配置再次之
- 如果级别一样,则消费方优先,提供方次之
客户端实现服务治理,比如负载均衡、服务路由、授权等,都可以通过从注册中心中获取值或者变更回调来动态地修改,如同前面的notify
方法触发Invoker
的动态更新,同时还可以更新路由策略等等。
这样,通过注册中心作为中间媒介,就可以实现对于服务的治理,Dubbo 也提供了 Dubbo Admin (Dubbo-ops)作为工具,对服务调用的配置进行管理,这些配置信息会被写入到Zookeeper中,然后反映到客户端和服务端。到这里就可以实现服务治理中,对服务的管理,比如负载均衡权重的调整、服务的临时下线、服务禁用、服务路由、集群容错等功能,服务降级可以通过在客户端实现 mock 机制来实现
服务的监控,则是通过 Dubbo Monitor 来统计调用的次数以及相关的调用参数,这需要有一个可以在调用请求的时候,收集调用信息,并发送到 Monitor 的方法,在 Dubbo 中是 MonitorFilter
,它是一个Filter
,顾名思义,它可以在 Server 端和 Client 端都进行一些拦截和过滤操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
if (invoker.getUrl().hasParameter(Constants.MONITOR_KEY)) {
RpcContext context = RpcContext.getContext(); // provider must fetch context before invoke() gets called
String remoteHost = context.getRemoteHost();
long start = System.currentTimeMillis(); // record start timestamp
getConcurrent(invoker, invocation).incrementAndGet(); // count up
try {
Result result = invoker.invoke(invocation); // proceed invocation chain
collect(invoker, invocation, result, remoteHost, start, false);
return result;
} catch (RpcException e) {
collect(invoker, invocation, null, remoteHost, start, true);
throw e;
} finally {
getConcurrent(invoker, invocation).decrementAndGet(); // count down
}
} else {
return invoker.invoke(invocation);
}
}
执行Invoker
的调用之后,使用collect
来收集信息,并将其记录到DubboMonitor#statisticsMap
中,后续如何使用就可以自己灵活定义了。
这里要讲一下MonitorFilter
是如何生效的,Protocol
在通过 SPI 拓展的时候,还会包裹一个 ProtocolFilterWrapper
,无论是RegistryProtocol
还是DubboProtocol
都会其包裹。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
}
当执行export
或者refer
的时候,都需要使用Invoker
来封装执行单元,ProtocolFilterWrapper
通过构建一个 Invoker
调用链来添加Filter
的逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
@Override
public Class<T> getInterface() {
return invoker.getInterface();
}
@Override
public URL getUrl() {
return invoker.getUrl();
}
@Override
public boolean isAvailable() {
return invoker.isAvailable();
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
return filter.invoke(next, invocation);
}
@Override
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}
这是非常典型的责任链模式,首先从 SPI 容器中取出Filter
列表,然后将一层层封装调用链,把各个 Filter
的逻辑,添加到每一次Invoker
调用中
这种 Filter 的机制,再结合 Google 在 Dapper 论文中发表的spanId 就可以实现服务调用的链路追踪。
总结
Dubbo 的服务治理功能的补充,也使其可以作为实现 SOA 架构的框架,不过在服务治理方面依然有一部分功能没有默认提供,比如服务熔断、分布式的配置、服务文档化等,不过我们而言比较大的问题在于对不同客户端的兼容性,这主要是组织结构上面的变化,整个公司体系内引入了很多的外部系统,这些外部系统并没有使用 Dubbo,而且就算是使用了 Dubbo 作为客户端,我们没有一个统一的入口来对这些不同的系统进行管理,这样就给我们的 SOA 服务化带来了一些麻烦。