轻量级RPC框架BEEHIVE介绍
小橘子🍊

简介

项目地址

beehive 是一款轻量级的 RPC 框架,通过 spring 容器来管理 bean,做到了对用户代码的零入侵,同时通过 spi 拓展机制,实现了自己的 ioc 容器,使得 beehive 能够很方便的对组件进行拓展。

功能特性

  • 实现了 SPI 拓展机制,能够方便的进行组件的自定义和替换

  • 提供了两种代理方式的支持(JDK 原生、Javassist)

  • 底层通信采用 Netty 框架,保证稳定性和高效性

  • 对 Zookeeper 注册中心的支持,能够自动的侦测服务的状态,同步进行更新

  • 完成了对与 fastjson 和 hessian 两种序列化器的支持

  • 整合 spring 容器,对用户代码零入侵,使用方便

  • 在客户端实现了两种负载均衡策略的支持(随机选取,轮询选取)

使用方式

beehive 加入了对于 spring 容器的支持,使得它在使用过程中可以做到对用户代码的零入侵,使用方式和 dubbo 很类似,在服务端,只需要定义要发布的服务的接口类型,服务的实现类,以及注册中心的地址即可完成启动,其他的一些相关参数可以作为备选项,这里给出一个使用样例。

provider.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beehive="https://www.aprilyolies.top/schema/beehive"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
https://www.aprilyolies.top/schema/beehive https://www.aprilyolies.top/schema/beehive.xsd">

<bean id="demoServiceImpl" class="top.aprilyolies.service.BeehiveServiceImpl"/>

<bean id="userServiceImpl" class="top.aprilyolies.service.UserServiceImpl"/>

<beehive:service id="demoService" service="top.aprilyolies.service.BeehiveService"
ref="demoServiceImpl" proxy-factory="jdk" serializer="hessian" server-port="7442"/>

<beehive:service id="userService" service="top.aprilyolies.service.UserService"
ref="userServiceImpl" proxy-factory="javassist" serializer="hessian" server-port="7442"/>

<beehive:registry id="registry" address="zookeeper://119.23.247.86:2181"/>
</beans>

这是服务端的 spring 配置文件,其中 id=”demoServiceImpl” 和 id=”userServiceImpl” 的 bean 就是服务的实现类,而 <beehive:service/> 标签中的 service=”top.aprilyolies.service.BeehiveService” 就是待发布服务的接口类型,另外一个 <beehive:service/> 类似,最后 <beehive:registry/> 标签中定义的就是注册中心地址,目前仅支持 zookeeer。

说完了配置文件,再看看启动程序,很简单,就是一个典型的 spring 容器启动程序如下,不做过多说明。

1
2
3
4
5
6
public static void main(String[] args) throws IOException {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("provider.xml");
context.start();
System.out.println("Provider started on thread " + Thread.currentThread().getName() + "..");
System.in.read();
}

启动服务端程序后就是客户端程序了,要做的配置也非常简单,仅仅是通过 beehive 引入服务即可,具体的 rpc 过程对于用户来说是绝对透明的,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beehive="https://www.aprilyolies.top/schema/beehive"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
https://www.aprilyolies.top/schema/beehive https://www.aprilyolies.top/schema/beehive.xsd">

<beehive:registry id="registry" address="zookeeper://119.23.247.86:2181"/>

<beehive:reference id="demoService" service="top.aprilyolies.service.BeehiveService" load-balance="poll"
serializer="hessian" read-timeout="1000" retry-times="2"/>

<beehive:reference id="userService" service="top.aprilyolies.service.UserService" load-balance="poll"
serializer="hessian" read-timeout="1000" retry-times="2"/>
</beans>

基本跟服务端的配置相似,注册中心的配置是一样的,只有引用服务的标签变成为 <beehive:reference/>,该标签的属性设置也跟服务端的服务发布标签有所不同,关于详细的属性说明请看下文。

客户端启动程序服务端一样简单,直接启动程序,查看输出结果即可,不做过多说明。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Consumer {
public static void main(String[] args) throws Exception {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");
context.start();
BeehiveService demoService = context.getBean("demoService", BeehiveService.class);
long start = System.currentTimeMillis();
for (int i = 0; i < 50000; i++) {
String hello = demoService.say("world - " + i);
System.out.println("result: " + hello);
Thread.sleep(500);
}
System.out.println(System.currentTimeMillis() - start);
}
}

参数设置说明

<beehive:service/> 标签

  • id:为 bean 在 spring 容器中的唯一标识符

  • service:将要发布的服务,值为你想要发布的服务的全限定名

  • ref:你所发布服务的实现类,它的值为 spring <bean/> 标签的 id 值

  • proxy-factory:服务提供端代理类创建的方式,目前支持 javassist 和 jdk 可选

  • serializer:序列化器,默认为 fastjson,可选 hessian

  • server-port:服务启动的端口号,默认使用 7440,你也可以再启动程序时通过 -Dport=端口号 来指定,优先级最高

<beehive:reference/> 标签

  • id:为 bean 在 spring 容器中的唯一标识符

  • service:将要发布的服务,值为你想要发布的服务的全限定名

  • load-balance:负载均衡设置,由客户端实现,目前只支持 random 和 poll 两种方式

  • proxy-factory:服务提供端代理类创建的方式,目前支持 javassist 和 jdk 可选

  • serializer:序列化器,默认为 fastjson,可选 hessian

  • read-timeout:指定 rpc 结果读取超时时间,如果本次结果获取失败,将会重试

  • retry-times:指定重试次数,即 rpc 结果获取超时重试次数

<beehive:registry/> 标签

  • id:为 bean 在 spring 容器中的唯一标识符

  • address:注册中心的地址,目前只支持 zookeeper,格式如 “zookeeper://host:port”

样例测试

基本测试

项目中提供了实例程序(位于 beehive-demo)模块下,通过 git clone 将工程拉取下来后,在根目录下输入如下指令进行安装。

mvn clean install -Dmaven.test.skip=true

因为需要用到注册中心,所以实例程序中注册中心的地址是我的阿里云服务器地址,正常情况下我会启动 zookeeper 服务,那么示例程序就会将服务注册到我的阿里云服务器的 zookeeper 上,当然你也可以在本机启动一个 zookeeper,然后修改 spring 配置文件中的注册中心地址。

启动服务器,如果你没有修改实例程序的配置文件,默认使用我阿里云的 zookeeper,输入如下指令:

java -jar beehive-demo/provider/target/provider-1.0-SNAPSHOT-jar-with-dependencies.jar

启动客户端,没有修改代码的情况下,会从注册中心获取服务信息,输入如下指令:

java -jar beehive-demo/consumer/target/consumer-1.0-SNAPSHOT-jar-with-dependencies.jar

如果过程没有错误的话,在控制台将会打印 rpc 的结果如下:

1
2
3
4
5
6
7
8
9
10
result: Jim say world - 0 from 192.168.1.105, [ server id is No.1]
result: Jim say world - 1 from 192.168.1.105, [ server id is No.1]
result: Jim say world - 2 from 192.168.1.105, [ server id is No.1]
result: Jim say world - 3 from 192.168.1.105, [ server id is No.1]
result: Jim say world - 4 from 192.168.1.105, [ server id is No.1]
result: Jim say world - 5 from 192.168.1.105, [ server id is No.1]
result: Jim say world - 6 from 192.168.1.105, [ server id is No.1]
result: Jim say world - 7 from 192.168.1.105, [ server id is No.1]
result: Jim say world - 8 from 192.168.1.105, [ server id is No.1]
result: Jim say world - 9 from 192.168.1.105, [ server id is No.1]

或者启动多线程的 consumer,输入如下指令:

java -cp beehive-demo/consumer/target/consumer-1.0-SNAPSHOT-jar-with-dependencies.jar top.aprilyolies.consumer.MultiThreadConsumer

我这里是启动了 5 个线程来进行的 rpc 请求,参数附带了当前线程的信息,所以 rpc 的结果也包含了线程信息,结果:

1
2
3
4
5
6
7
8
9
10
11
result: Jim say MultiThreadConsumer-pool-thread-3 - 0 from 192.168.1.105, [ server id is No.1]
result: Jim say MultiThreadConsumer-pool-thread-3 - 1 from 192.168.1.105, [ server id is No.1]
result: Jim say MultiThreadConsumer-pool-thread-1 - 0 from 192.168.1.105, [ server id is No.1]
result: Jim say MultiThreadConsumer-pool-thread-4 - 0 from 192.168.1.105, [ server id is No.1]
result: Jim say MultiThreadConsumer-pool-thread-5 - 0 from 192.168.1.105, [ server id is No.1]
result: Jim say MultiThreadConsumer-pool-thread-2 - 0 from 192.168.1.105, [ server id is No.1]
result: Jim say MultiThreadConsumer-pool-thread-3 - 2 from 192.168.1.105, [ server id is No.1]
result: Jim say MultiThreadConsumer-pool-thread-1 - 1 from 192.168.1.105, [ server id is No.1]
result: Jim say MultiThreadConsumer-pool-thread-2 - 1 from 192.168.1.105, [ server id is No.1]
result: Jim say MultiThreadConsumer-pool-thread-5 - 1 from 192.168.1.105, [ server id is No.1]

服务切换的测试

示例程序中提供了两个服务端程序,表示两个服务提供者,测试服务切换需要同时启动这两个程序,指令如下:

java -cp beehive-demo/provider/target/provider-1.0-SNAPSHOT-jar-with-dependencies.jar top.aprilyolies.provider.Provider

java -cp beehive-demo/provider/target/provider-1.0-SNAPSHOT-jar-with-dependencies.jar top.aprilyolies.provider.AnotherProvider

启动客户端,没有修改代码的情况下,会从注册中心获取服务信息,输入如下指令:

java -jar beehive-demo/consumer/target/consumer-1.0-SNAPSHOT-jar-with-dependencies.jar

因为测试程序中默认使用的是轮询负载均衡机制,所以客户端会从所有的服务提供者中逐个调用,得到的结果如下,根据最后的 server
id 就能够分辨出来当前是请求的哪个服务提供者。

1
2
3
4
5
6
7
8
9
10
result: Jim say world - 0 from 192.168.1.105, [ server id is No.1]
result: Jim say world - 1 from 192.168.1.105, [ server id is No.2]
result: Jim say world - 2 from 192.168.1.105, [ server id is No.1]
result: Jim say world - 3 from 192.168.1.105, [ server id is No.2]
result: Jim say world - 4 from 192.168.1.105, [ server id is No.1]
result: Jim say world - 5 from 192.168.1.105, [ server id is No.2]
result: Jim say world - 6 from 192.168.1.105, [ server id is No.1]
result: Jim say world - 7 from 192.168.1.105, [ server id is No.2]
result: Jim say world - 8 from 192.168.1.105, [ server id is No.1]
result: Jim say world - 9 from 192.168.1.105, [ server id is No.2]

或者启动多线程的 consumer,输入如下指令:

java -cp beehive-demo/consumer/target/consumer-1.0-SNAPSHOT-jar-with-dependencies.jar top.aprilyolies.consumer.MultiThreadConsumer

1
2
3
4
5
6
7
8
9
10
11
result: Jim say MultiThreadConsumer-pool-thread-2 - 0 from 192.168.1.105, [ server id is No.1]
result: Jim say MultiThreadConsumer-pool-thread-2 - 1 from 192.168.1.105, [ server id is No.2]
result: Jim say MultiThreadConsumer-pool-thread-4 - 0 from 192.168.1.105, [ server id is No.1]
result: Jim say MultiThreadConsumer-pool-thread-1 - 0 from 192.168.1.105, [ server id is No.1]
result: Jim say MultiThreadConsumer-pool-thread-5 - 0 from 192.168.1.105, [ server id is No.1]
result: Jim say MultiThreadConsumer-pool-thread-3 - 0 from 192.168.1.105, [ server id is No.1]
result: Jim say MultiThreadConsumer-pool-thread-2 - 2 from 192.168.1.105, [ server id is No.1]
result: Jim say MultiThreadConsumer-pool-thread-3 - 1 from 192.168.1.105, [ server id is No.2]
result: Jim say MultiThreadConsumer-pool-thread-5 - 1 from 192.168.1.105, [ server id is No.2]
result: Jim say MultiThreadConsumer-pool-thread-1 - 1 from 192.168.1.105, [ server id is No.2]
result: Jim say MultiThreadConsumer-pool-thread-4 - 1 from 192.168.1.105, [ server id is No.2]

可以看到多线程的环境下,rpc 的调用逻辑和单线程是保持一致的。

实例程序默认是使用的轮询负载均衡机制,如果过程没错的话,那么你将会看到客户端会交替的从两个 provider 进行 rpc 调用。

尝试关掉其中一个 provider,客户端会侦测到这个变化,随即将这个下线的 provider 剔除,仅仅从剩下的 provider 中进行 rpc 调用。

再尝试重启这个 provider,客户端也会侦测到这个变化,随即将这个 provider 加入到可调用的 providers 列表中,进而进行 rpc 调用。

底层实现原理

标签解析

beehive 实现了自己的标签解析器,spring 容器能够对这些标签对应的 bean 的创建及初始化进行管理,beehive 自定义的标签只有 <beehive:registry/>、<beehive:service/>、<beehive:reference/> 三个,分别对应注册中心、服务发布、服务调用三个逻辑,相应的 BeanDefinitionParser 是由 top.aprilyolies.beehive.spring.namespace.BeehiveNamespaceHandler 完成注册的,它的代码内容如下,就是为每个 beehive 对应的标签注册了一个 BeanDefinitionParser,用于解析 spring 配置文件中的 beehive 标签。

1
2
3
4
5
6
7
8
9
10
11
public class BeehiveNamespaceHandler extends NamespaceHandlerSupport {
@Override
public void init() {
// beehive:registry 标签解析器
registerBeanDefinitionParser("registry", new RegistryBeanDefinitionParser());
// beehive:service 标签解析器
registerBeanDefinitionParser("service", new ServiceBeanDefinitionParser());
// beehive:reference 标签解析器
registerBeanDefinitionParser("reference", new ReferenceBeanDefinitionParser());
}
}

因为三个 BeanDefinitionParser 的基本逻辑一样,我这里仅以 ServiceBeanDefinitionParser 进行大致说明,先贴出 ServiceBeanDefinitionParser 的源代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public class ServiceBeanDefinitionParser extends AbstractBeanDefinitionParser {
@Override
public BeanDefinition parse(Element element, ParserContext parserContext) {
// 构建 RootBeanDefinition,用于承载解析出来的信息
RootBeanDefinition beanDefinition = new RootBeanDefinition();
// 指定解析的 bean 的类型
beanDefinition.setBeanClass(ServiceProvider.class);
// 不使用懒加载
beanDefinition.setLazyInit(false);
String id = element.getAttribute("id");
String service = element.getAttribute("service");
beanDefinition.getPropertyValues().addPropertyValue("service", service);
// 如果没有指定 id 属性
if (StringUtils.isEmpty(id)) {
String name = element.getAttribute("name");
beanDefinition.getPropertyValues().addPropertyValue("name", name);
// 尝试使用 name 来确定 id 信息
if (!isExisted(parserContext, name)) {
id = name;
} else {
// name 不行的话,那么就使用 service 来确定 id 信息
String beanName = getBeanName(service);
id = beanName;
int count = 1;
while (isExisted(parserContext, id)) {
id = beanName + count;
}
}
}
if (!StringUtils.isEmpty(id)) {
// 相同 id 只能存在一个
if (isExisted(parserContext, id)) {
throw new IllegalStateException(String.format("Bean of id %s has existed", id));
}
beanDefinition.getPropertyValues().addPropertyValue("id", id);
// 进行注册信息,这一步一定不能落下
parserContext.getRegistry().registerBeanDefinition(id, beanDefinition);
}
// 解析 protocol 属性
parseAttribute(element, beanDefinition, "protocol", "beehive");
// 解析 proxy 属性
parseAttribute(element, beanDefinition, "proxy-factory", "javassist");
// 解析 serializer 属性
parseAttribute(element, beanDefinition, "serializer", "fastjson");
// 解析 serializer 属性
parseAttribute(element, beanDefinition, "server-port", "7440");
String ref = element.getAttribute("ref");
// 如果 setter 方法对应的属性为 ref,并且 spring 容器中已经注册过这个 ref 所引用的 bean 的 beanDefinition
if (parserContext.getRegistry().containsBeanDefinition(ref)) {
// 那就拿到这个 beanDefinition
BeanDefinition refBean = parserContext.getRegistry().getBeanDefinition(ref);
// 这个 beanDefinition 必须是单例的
if (!refBean.isSingleton()) {
throw new IllegalStateException("The exported service ref " + ref + " must be singleton! Please set the " + ref + " bean scope to singleton, eg: <bean id=\"" + ref + "\" scope=\"singleton\" ...>");
}
}
RuntimeBeanReference reference = new RuntimeBeanReference(ref);
beanDefinition.getPropertyValues().addPropertyValue("ref", reference);
return beanDefinition;
}
}

这个类实现了 org.springframework.beans.factory.xml.BeanDefinitionParser 接口,他只有一个 BeanDefinition parse(Element element, ParserContext parserContext); 方法,参数 element 就是 spring 配置文件中 beehive 对应的那个标签元素,参数 parserContext 就是一个标签解析的上下文环境,可以从中获取到已经解析的标签的信息。在 parse 方法中我们就需要实现自己的标签解析逻辑,主要就是创建了一个 RootBeanDefinition 实例,然后将其和正在解析的标签所对应的类进行绑定,也就是 beanDefinition.setBeanClass(ServiceProvider.class); 这个方法,接下来就是将标签中解析出来的属性逐个的添加到刚刚创建的 RootBeanDefinition 实例中,这样 RootBeanDefinition 就包含了类信息及相关的参数信息,最后就是将 RootBeanDefinition 实例返回,交由 spring 容器进行管理,这包括 bean 是否是单例,不同的初始化阶段的声明周期方法的调用,字段的自动装配等等。

服务端启动

服务端启动主要是完成了三件事,打开数据通信服务器、创建服务接口服务端代理,向注册中心发布服务,执行的先后顺序如下图:

服务端启动核心逻辑

在完成标签的解析后,标签对应的 bean 就会交由 spring 容器管理,根据 bean 的实现接口的类型,spring 容器会在不同的阶段完成 bean 对应接口方法的调用,服务端的启动就是如此。先看看服务发布标签对应 bean(ServiceProvider)的实现接口 public class ServiceProvider extends ServiceConfigBean implements ApplicationListener<ContextRefreshedEvent>, InitializingBean, ApplicationContextAware,对于 ApplicationListener 接口的实现类,spring 容器在初始化完成后进行该接口对应方法的调用,InitializingBean 接口则是在进行 bean 的初始化时进行调用,而 ApplicationContextAware 是一个感知接口,就是该接口的实现类能够拿到 spring 上下文环境,在 beehive 中,则是利用该接口拿到了 spring 上下文环境,通过它注册了一个关闭钩子函数,用于关闭通信端程序。具体如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
// 注册关闭监听器
// 如果 context 是 ConfigurableApplicationContext 接口的实例
if (applicationContext instanceof ConfigurableApplicationContext) {
// spring 框架的方法,向 jvm 注册一个关闭钩子函数,在 jvm 关闭时会调用这个钩子函数来关闭 applicationContext
((ConfigurableApplicationContext) applicationContext).registerShutdownHook();
}
// 注册关闭监听器
addApplicationListener(new ShutdownHookListener());
}

/**
* beehive 的关闭监听器,它会监听 context 关闭事件,同时关闭相关的组件
*/
public class ShutdownHookListener implements ApplicationListener {
@Override
public void onApplicationEvent(ApplicationEvent event) {
// 此监听器只会监听 context 关闭事件
if (event instanceof ContextClosedEvent) {
BeehiveShutdownHook.closeAll();
}
}
}

// 关闭注册的 Registry
public static void closeAll() {
for (Registry registry : registries) {
registry.close();
}
for (EndPoint endPoint : endPoints) {
endPoint.close();
}
}

而实现 InitializingBean 接口主要是为了注入 registry 对应的 bean 如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* 用于为当前 bean 填充一些必要属性
*
* @throws Exception
*/
@Override
public void afterPropertiesSet() throws Exception {
checkRegistry();
}

/**
* 检查当前 bean 的 registry 属性是否为空,否则从 spring 容器中获取对应的值进行填充
*/
private void checkRegistry() {
RegistryConfigBean registry = getRegistry();
if (registry == null) {
if (applicationContext != null) {
Map<String, RegistryConfigBean> registryMap = BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext,
RegistryConfigBean.class, false, false);
if (registryMap.size() > 0) {
// 如果 spring 容器中有多个 registry bean,那么优先获取第一个
Collection<RegistryConfigBean> registries = registryMap.values();
for (RegistryConfigBean reg : registries) {
setRegistry(reg);
break;
}
}
}
}
}

服务端启动的主要逻辑是在 ApplicationListener 接口对应的方法中完成的,该方法中直接是调用了 exportService(); 方法,该方法中主要是两个部分,准备 registryUrls 和进行真正的服务注册,准备 registryUrls 就只是一个信息拼凑的过程,逻辑简单,只是过程比较繁琐,感兴趣的话可以自行去看源码,最终的拼凑结果我通过 debug 的方式显示出来了,请看下图。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
exportService();
}

public void exportService() {
if (!published) {
synchronized (publishMonitor) {
if (!published) {
if (logger.isDebugEnabled()) {
logger.debug("Export service via thread " + Thread.currentThread().getName());
}
// 将当前实例以 provider_model 的形式保存到 BeehiveContext 中
BeehiveContext.unsafePut(UrlConstants.PROVIDER_MODEL, this);
// 根据 registry 属性构建 registry urls
List<URL> registryUrls = getRegistryUrl(getRegistry());
// 将当前实例中的属性填充到 url 的参数中
fillParameters(registryUrls, this);
// 检查 registry url 是否包含必要的属性
checkRegistryUrls(registryUrls);
// 进行真正的服务注册
registryService(registryUrls);
}
published = true;
}
}
}
registryUrls的内容

这里重点关注第二部分进行真正的服务注册方法 registryService(registryUrls);,它根本是调用的 AbstractRegistry#registry 方法,在该方法中主要就是完成了服务端启动的三个核心逻辑,从其方法名就可以知道了。

1
2
3
4
5
6
7
8
9
10
11
12
@Override
public void registry(URL url) {
if (url == null)
throw new IllegalArgumentException("Can't publish a service for null url");
try {
openServer(url);
createInvoker(url);
doPublish(url);
} catch (Exception e) {
logger.error("publish service failed", e.getCause());
}
}

先看 openServer(url); 方法,它会根据 url 确定当前终端是服务端还是客户端,因为我们现在是分析的服务端启动,所以走的是上边那条逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
@Override
protected void openServer(URL url) {
String protocol = url.getOriginUrl().getParameterElseDefault(UrlConstants.SERVICE_PROTOCOL, UrlConstants.DEFAULT_SERVICE_PROTOCOL);
URL serviceUrl = URL.copyFromUrl(url.getOriginUrl());
serviceUrl.setOriginUrl(url.getOriginUrl());
serviceUrl.setProtocol(protocol);
if (url.isProvider()) {
protocolSelector.publish(serviceUrl);
} else if (!url.isProvider()) {
protocolSelector.subscribe(serviceUrl);
}
}

该方法的实现是这样的,优先从缓存中获取 server 实例,如果没有的话就新建一个 server 实例并进行缓存,server 实例的创建是在 NettyTransporter#bind 方法中完成的,可以看到这里创建的是一个 NettyServer,也就是说 beehive 的底层是采用的 netty 作为通信框架的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public Server bind(URL url) {
String serverKey = buildServerKey(url);
Server server = serverCache.get(serverKey);
if (server == null) {
synchronized (serverCache) {
if (serverCache.get(serverKey) == null) {
server = new NettyServer(url);
AbstractConfig.BeehiveShutdownHook.addEndPoint(server);
serverCache.putIfAbsent(serverKey, server);
}
}
}
return server;
}

知道 openServer(url); 的实现后,再看 createInvoker(url); 方法,它就是一个创建服务接口代理类的过程,考虑到拓展性,所以 beehive 的处理过程还将创建出来的代理进行了封装,最终返回的是一个 Invoker 链,我们先看看根本的代理类的创建过程,它的实现是在 AbstractProxyFactory#createInvoker 方法中,里边有两个逻辑,创建原生的代理类,和将代理类封装成为 Invoker 实现类,这样做是为了方便后边的 Invoker 链的构建。beehive 提供了两种代理类生成的方式,这里只是为了说明代理的逻辑,所以选择稍微简单的 jdk 原生代理的方式进行说明,而 javassist 方式生成代理的过程可以自行参考源码理解。

jdk 原生代理类生成的入口方法为 JdkProxyFactory#createProxy,我们是服务端代理类生成,走上一个分支,可以看到它根本是调用的 ProviderProxy.getJdkProxy(target, clazz, Proxy.class); 方法,其中 target 就是我们在 spring 配置文件中指定的服务接口的实现类,class 是服务接口对应的 class,而 Proxy.class 则是返回的接口类型,也就是说我们生成的代理类是实现了两个接口的,服务接口和 Proxy 接口,且返回的表现形式是 Proxy。

1
2
3
4
5
6
7
8
9
10
11
12
@Override
protected Proxy createProxy(Class<?> clazz, URL url) {
if (url.isProvider()) {
// 根据 url 信息获取 invoke target 实例
ServiceConfigBean serviceConfigBean = BeehiveContext.unsafeGet(UrlConstants.PROVIDER_MODEL, ServiceConfigBean.class);
Object target = serviceConfigBean.getRef();
return ProviderProxy.getJdkProxy(target, clazz, Proxy.class);
} else {
Invoker invoker = getInvoker(url);
return ConsumerProxy.getJdkProxy(new InvokerInvocationHandler(invoker, url), clazz, Proxy.class);
}
}

知道这一点后,我们再直接看 jdk 代理类的构建细节,也就是如下代码所示的那样,可以看到代理的逻辑是由 ProviderInvocationHandler 指定的,它持有了我们在 spring 配置文件中指定的服务接口的实现类,当代理类需要执行某个方法时,真正调用的其实是服务接口实现类的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 创建 jdk 代理类
*
* @param classes
* @return
*/
public static Proxy getJdkProxy(Object target, Class<?>... classes) {
if (classes.length < 1) {
throw new IllegalStateException("Can't create jdk proxy for none of interface has specified");
}
ClassLoader classLoader = classes[0].getClassLoader();
ProviderInvocationHandler handler = new ProviderInvocationHandler(target);
return (Proxy) java.lang.reflect.Proxy.newProxyInstance(classLoader, classes, handler);
}

/**
* provider 的 jdk invocation handler
*/
private static class ProviderInvocationHandler implements InvocationHandler {

private final Object target;

public ProviderInvocationHandler(Object target) {
this.target = target;
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
return method.invoke(target, args);
}
}

返回的 jdk 代理是是被封装成了 ProxyWrapperInvoker,beehive 接下来就是根据该 invoker 构建 invoker 链,至于 invoker 链的其他节点,则是通过 spi 拓展机制来指定的,但是关于 invoker 逻辑功能,我暂时是做的空实现,所以采用的是硬编码的方式,如果以后有新的需求了,就会考虑将其实现改为适配 spi 拓展,关乎 invoker 链,这里采用的是责任链模式,具体的构建过程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* 通过 filter 构建 invoker 链,最后一个 invoker 就是我们创建的 ProxyWrapperInvoker,它封装了我们真正的调用逻辑
*
* @param invoker 原始的 invoker
* @return 通过 filter 构建出来的 invoker 链
*/
public static Invoker buildInvokerChain(Invoker<?> invoker) {
// TODO 这里的 filter 获取应该通过 ExtensionLoader
List<Filter> filters = new ArrayList<>();
filters.add(new AccessLogFilter());
filters.add(new MonitorFilter());
Invoker ptr = invoker;
if (filters.size() > 0) {
for (Filter filter : filters) {
final Invoker next = ptr;
Invoker pre = new AbstractInvoker() {
@Override
protected Object doInvoke(InvokeInfo info) {
return filter.doFilter(next, info);
}
};
ptr = pre;
}
}
//noinspection unchecked
return ptr;
}

可以看到封装我们代理类的 ProxyWrapperInvoker 节点位于 invoker 链的末尾,而最终返回的是 invoker 链的头结点,这样在进行逻辑调用时,就会先执行拓展逻辑,最后再执行我们的代理逻辑。

最后一个就是 doPublish(url); 方法,它做的事情很简单,就是向注册中心也就是 zookeeper 中写入当前发布的服务的相关信息,这里的 zookeeper 客户端使用的 curator 框架,具体的过程很简单,这里只给出写入服务信息后的结果。

zookeeper 中写入的服务信息

执行到这里,服务端的启动计算完成了,这个启动过程主要就是做了三件事情,启动数据交互的服务终端,创建逻辑调用的代理类,将其封装为 Invoker 链,之后就是向注册中心发布服务,接下来继续读客户端的启动进行说明。

客户端启动

在了解过服务端的启动之后,再看客户端的启动流程就会很简单,跟客户端一样,它主要也是实现了三个逻辑,在这三个逻辑中,就只有代理类的创建比较复杂,而在数据交互的终端创建和消费者信息注册方面,过程基本一致,所以这里主要就是对代理类的创建方式进行说明。

客户端启动核心逻辑

首先还是看和服务调用相关的标签 <beehive:reference/> 对应的 bean(ServiceConsumer)所实现的接口,相比于服务端,客户端实现的接口就只有一个 FactoryBean,它指定了三个方法,根据 spring 的实现规则,知道在获取实现 FactoryBean 接口的 bean 时,它实际返回的是 getObject() 方法返回的实例。根据这个特性,所以 beehive 客户端的服务接口代理类就是通过该方法返回的,我们重点关注这个方法。

1
2
3
4
5
public interface FactoryBean<T> {
T getObject() throws Exception;
Class<?> getObjectType();
boolean isSingleton();
}

getObject() 方法和服务端的 ServiceProvider#onApplicationEvent 方法很相似,分为准备 registryUrls 信息和订阅 registryUrl 两部分(客户端的订阅和服务端的发布相对应),准备 registryUrls 信息的过程就是个简单但是比较繁琐的过程,可以自行看源码了解,而订阅 registryUrl 的逻辑根本就是调用的 AbstractRegistry#registry 方法,它跟服务端调用的是同样的方法。

1
2
3
4
5
6
7
8
9
10
11
12
@Override
public void registry(URL url) {
if (url == null)
throw new IllegalArgumentException("Can't publish a service for null url");
try {
openServer(url);
createInvoker(url);
doPublish(url);
} catch (Exception e) {
logger.error("publish service failed", e.getCause());
}
}

所以客户端启动做的三个核心逻辑和服务端是一样的,只是具体的实现不一样,先看 openServer(url);,它根本是调用的 NettyTransporter#connect 方法,可以看到这里我们创建的是 NettyClient,这和服务端的 NettyServer 对应。

1
2
3
4
5
6
@Override
public Client connect(URL url) {
NettyClient nettyClient = new NettyClient(url);
AbstractConfig.BeehiveShutdownHook.addEndPoint(nettyClient);
return nettyClient;
}

再看 createInvoker(url); 方法的实现,我们这里启动客户端,所以走的是 else 分支,首先代码是从 zookeeper 注册中心中获取已经发布的服务信息,也就是服务端所发布的服务信息。接着调用 ZookeeperRegistry#addProviderRefreshListener 方法,其实就是通过 curator 框架向 zookeeper 注册了一个监听器,它会监听服务上下线的信息,以此来达到更新 provider 列表的目的,关键的代码如下,添加的 listener 为 ProviderRefreshListener,可以看到它监听 CHILD_ADDED、CHILD_REMOVED、CHILD_UPDATED 三种事件,所作出的动作是 BeehiveContext.unsafePut(PROVIDERS, providerUrls); 更新 provider 列表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* 为 consumer 添加一个监听器,用于监测 provider 更新消息
*
* @param providerPath
*/
private void addProviderRefreshListener(String providerPath) {
try {
PathChildrenCache pathCache = new PathChildrenCache(zkClient, providerPath, true);
pathCache.start();
pathCache.getListenable().addListener(new ProviderRefreshListener(pathCache));
} catch (Exception e) {
logger.error("Add provider refresh listener failed, the target path was " + providerPath + ", the curator client was " + zkClient);
e.printStackTrace();
}
}

/**
* 该 listener 用于刷新 provider 信息
*/
private class ProviderRefreshListener implements PathChildrenCacheListener {
private final PathChildrenCache pathCache;

public ProviderRefreshListener(PathChildrenCache pathCache) {
this.pathCache = pathCache;
}

@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
PathChildrenCacheEvent.Type eventType = event.getType();
// 如果 child 信息发生变化,进行更新
switch (eventType) {
case CHILD_ADDED:
case CHILD_REMOVED:
case CHILD_UPDATED: {
List<ChildData> currentData = pathCache.getCurrentData();
List<String> providerUrls = new ArrayList<>(currentData.size());
for (ChildData data : currentData) {
providerUrls.add(data.getPath());
}
// 这里是保存到 concurrent hash map 中,能够保证可见性
BeehiveContext.unsafePut(PROVIDERS, providerUrls);
}

}
}
}

接着就是创建服务接口的客户端代理,这里以 jdk 的客户端代理进行说明,还记得在服务端创建 jdk 代理时有获取一个代理 target 实例吗?它就是服务接口的实现类,完成了具体提服务逻辑。但现在我们是在创建客户端的代理,那么这里的代理 target 实例是什么呢?其实可以先试想一下,我们客户端在调用接口方法时,代理类应该完成啥样的逻辑呢?

beehive 这里的实现其实就是将被调用的接口方法和参数相关信息发送到服务端,服务端再根据收到的方法及参数信息完成真正的服务接口实现类的逻辑调用,最终再将处理的结果返回给客户端代理,最终代理将结果返回给方法调用者,完成这个发送方法及参数信息给服务端的类就是这里应该拿到的 target 实例,通过 debug,可以知道这个实例其实就是 FailoverClusterInvoker。它里边的实现逻辑不是这里应该关注的点,我们只需要知道,客户端的服务接口代理根本是调用的 FailoverClusterInvoker 实例的方法。知道这个后,jdk 代理类的创建就跟服务端一样了,具体到代码就是下边这一段,getInvoker(url) 拿到的是 FailoverClusterInvoker,它将被作为 target 而创建服务端接口代理类。

top.aprilyolies.beehive.proxy.JdkProxyFactory#createProxy

1
2
Invoker invoker = getInvoker(url);
return ConsumerProxy.getJdkProxy(new InvokerInvocationHandler(invoker, url), clazz, Proxy.class);

完成客户端服务接口代理类创建的说明,就只剩下 doPublish(url);,跟服务端一样就是向注册中心写入自身的信息,使用的是 curator 框架,过程很简单,不做细说,只给出写入的结果。

zookeeper 中写入的消费者信息

客户端 RPC 请求流程

当完成客户端和服务端的启动后,客户端就可以进行 rpc 请求了,还记的上文指出的 FailoverClusterInvoker 实例吗?它是客户端代理类真正逻辑的起点。从类名可以知道它是一个 Invoker 类,自然是实现了 Invoker 接口,这样我们关注的也就是该接口对应的 Invoker#invoke 方法,它的实现是在父类 AbstractInvoker 之中,该方法又是调用了 AbstractInvoker#doInvoke 方法,子类负责实现该方法的逻辑,这里是典型的模板方法设计模式。

我先把 FailoverClusterInvoker#doInvoke 方法贴出来,第一个 if 判断实现看看缓存中是否有 invokers 信息,如果没有就会执行 listInvokers 方法来列出可用的 invokers。

top.aprilyolies.beehive.invoker.FailoverClusterInvoker#doInvoke

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@Override
protected Object doInvoke(InvokeInfo info) {
try {
if (this.invokers == null || registry == null) {
synchronized (this) {
if (invokers == null || registry == null) {
invokers = listInvokers();
Registry registry = BeehiveContext.unsafeGet(REGISTRIES, Registry.class);
addInvokersRefreshListener(this.registry);
this.registry = registry;
}
}
}
if (needUpdateInvokers) {
updateInvokers();
}
int reInvokeCount = retryCountThreadLocal.get();
LoadBalance loadBalance = loadBalanceThreadLocal.get();
if (loadBalance == null) {
loadBalance = createLoadBalance(url);
loadBalanceThreadLocal.set(loadBalance);
}
// 选择合适的 invoker
Invoker<T> invoker = selectInvoker(loadBalance, invokers);
if (invoker != null) {
Invoker chain = buildInvokerChain(invoker);
Object result = chain.invoke(info);
if (result == null) {
// 辅助刷新 invokers
needUpdateInvokers = true;
// 返回结果为空进行重试
if (reInvokeCount++ < MAX_REINVOKE_TIMES) {
retryCountThreadLocal.set(reInvokeCount);
return doInvoke(info);
} else {
// 重试达到上限,直接返回 null 结果
throw new IllegalStateException("Do invoke " + MAX_REINVOKE_TIMES + " times, but the result was still null");
}
} else return result;
} else {
throw new IllegalStateException("There is none of service provider could be use, please check your " +
"registry center that if there is any service has published.");
}
} finally {
retryCountThreadLocal.set(0);
}
}

如下就是列出可用 invokers 信息的方法,首先是拿到注册中心上发布的服务信息的集合,在服务端启动时,会首先获取一次该集合并缓存到 BeehiveContext 中,如果后续发生了服务信息的更新即服务的上下线等,客户端启动时注册的监听器会侦测该变化,并刷新服务信息集合。接着就是过滤这些服务信息,因为即便是相同的服务,可能会由于发布服务时使用的编解码方式与客户端使用的编解码方式不一致导致服务不能直接访问,这样该服务也就应该被过滤掉。过滤的条件可以更加细化,如果你有好的建议也可以提交 PR,我看到后会进行处理。完成服务信息的过滤后接着就是获取当前服务对应的通信终端,最后将获取的服务信息和通信终端构建为 RemoteInvoker 返回。

top.aprilyolies.beehive.invoker.FailoverClusterInvoker#listInvokers

1
2
3
4
5
6
7
8
9
10
private synchronized List<Invoker<T>> listInvokers() {
//noinspection unchecked
Map<String, List<String>> providersMap = BeehiveContext.unsafeGet(UrlConstants.PROVIDERS, Map.class);
List<String> providers = providersMap.get(url.getParameter(UrlConstants.SERVICE));
providers = filterProviders(providers);
this.providers = providers;
Client client = BeehiveContext.unsafeGet(UrlConstants.CONSUMERS_TRANSPORT, Client.class);
assert providers != null;
return createRemoteInvoker(providers, client);
}

FailoverClusterInvoker#doInvoke 的第二个 if 条件是判断是否服务发生了更新,如果发生了更新就需要刷新 invokers 列表,刷新的过程和列出 invokers 列表的逻辑相似,只是这其中涉及到一个集合操作,即剔除下线的服务,新增上线的服务,感兴趣的话可以自己看源代码。

接下来的代码就是从列出的 invokers 列表中选择一个 invoker 构建 invoker 链,然后进行 invoke 调用,选择 invoker 的过程我将其描述为一个负载均衡的过程,默认提供了两种负载均衡策略,随机选取和轮询选取,具体的由 PollLoadBalance 和 RandomLoadBalance 两个类来负责实现,代码也很简单,贴出来自行阅读即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class RandomLoadBalance extends AbstractLoadBalance {
private static final Random random = new Random(System.currentTimeMillis());

@Override
public <T> Invoker<T> select(List<Invoker<T>> invokers) {
try {
if (invokers == null || invokers.size() == 0) {
return null;
}
int idx = Math.abs(random.nextInt() % invokers.size());
if (logger.isDebugEnabled()) {
logger.debug("There are " + invokers.size() + " invokers, RandomLoadBalance choose the invoker with" +
"index of " + idx);
}
return invokers.get(idx);
} catch (Exception e) {
logger.error("Got invoker failed, this may caused by some new provider was added, and the beehive" +
" was refresh the invokers list");
return null;
}
}

}

public class PollLoadBalance extends AbstractLoadBalance {
private int idx = 0;

@Override
public <T> Invoker<T> select(List<Invoker<T>> invokers) {
int size = invokers.size();
if (idx >= size) {
idx = 0;
}
return invokers.get(idx++);
}
}

FailoverClusterInvoker#selectInvoker 方法就是通过上述的两个负载均衡器选择一个合适的 invoker 的方法,拿到这个 invoker 后就是构建 invoker 链,这个过程和服务端启动的 invoker 链构建过程一样,这里不再赘述。接着就是调用 invoker 链的 invoke 方法,根据之前的分析我们知道 invoker 链的尾节点就是我们通过负载均衡器选择出来的那个 invoker,也就是我们在 FailoverClusterInvoker#listInvokers 方法中列出的众多 invokers 中的一个,他们都是 RemoteInvoker 实例,所以我们接下来分析该 invoker 的 invoke 方法。

同样的 RemoteInvoker 是继承自 AbstractInvoker 父类,我们关注的是 doInvoke 方法的实现。这里还是先把代码贴出来,考虑到篇幅问题,我这里剃掉了部分空指针判断条件和失败重试代码,只留下了核心代码。简化后的代码可以很清楚的看到核心逻辑就三个,连接服务端,发送 rpc 请求信息,获取响应结果。连接服务端和发送 rpc 请求和 netty 的使用有关,如果不熟悉 netty 需要先自行了解 netty 的使用,本文这里不细述。而这里获取响应的结果是采用的异步的方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Override
protected Object doInvoke(InvokeInfo info) {
try {
// 获取当前线程的 channelMap
Map<String, Channel> channelMap = addressChannel.get();
Channel ch = channelMap.get(channelKey);
if (ch == null || !ch.isOpen() || !ch.isActive()) {
// 连接服务器
ch = connectServer();
}
// 构建 request 消息
Request request = buildRequest(info);
// 发送消息
ch.writeAndFlush(request);
// 获取异步的响应结果
Object result = getResponse(request);
String retryTimes = this.url.getParameter(UrlConstants.RETRY_TIMES);
int times = RETRY_TIMES;
if (!StringUtils.isEmpty(retryTimes)) {
try {
times = Integer.parseInt(retryTimes);
} catch (NumberFormatException e) {
}
}
return result;
} catch (Exception e) {
return null;
}
}

获取 rpc 结果的入口方法是 RemoteInvoker#getResponse,这里需要明确的一点是每一个 rpc 请求都有它所唯一对应的 id 来标志,rpc 请求结果的异步获取利用了这个特性。进行当前 rpc 请求的线程将本次请求的 id 和一个 RpcResult 形成映射缓存到 BeehiveContext,然后当前线程调用该 RpcResult 的 get 方法来获取结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* 从 BeehiveContext 中获取异步的响应结果
*
* @param request
* @return
*/
private Object getResponse(Request request) {
// 拿到请求的 id,用于异步获取响应内容
String sid = String.valueOf(request.getId());
// 存根请求结果
BeehiveContext.unsafePut(sid, new RpcResult());
// 异步获取相应结果
Object res = null;
try {
String timeout = this.url.getParameter(UrlConstants.READ_TIMEOUT);
if (StringUtils.isEmpty(timeout)) {
res = BeehiveContext.unsafeGet(sid, RpcResult.class).get();
} else {
try {
int time = Integer.parseInt(timeout);
res = BeehiveContext.unsafeGet(sid, RpcResult.class).get(time);
} catch (NumberFormatException e) {
logger.warn("The timeout parameter " + timeout + " is wrong, use the default timeout 2000ms");
res = BeehiveContext.unsafeGet(sid, RpcResult.class).get();
}
}
} catch (Throwable t) {
// empty
}
// 移除缓存
BeehiveContext.unsafeRemove(sid);
return res;
}

下边列出了 RpcResult 的 get 方法,可以看到这里使用了 Condition 来完成线程之间的通信。如果 rpc 请求过程的完成标志 finished 为 false,进行 get 方法的当前线程会进行限时阻塞。如果在阻塞的过程中 rpc 过程完成,当前被阻塞的线程会被唤醒继续之前的逻辑。如果发生超时,那么之前的代码会根据情况进行请求的重试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* 带延时的获取相应的结果
*
* @param timeout 延时时长
* @return 相应的结果
*/
public Object get(int timeout) {
if (finished) {
return this.msg;
} else {
try {
lock.lock();
if (!finished) {
finishCondition.await(timeout, TimeUnit.MILLISECONDS);
if (!finished) {
throw new RuntimeException("Get result was timeout");
} else {
return this.msg;
}
}
return this.msg;
} catch (InterruptedException e) {
logger.error("Get result was interrupted");
throw new RuntimeException("Get result was interrupted");
} finally {
lock.unlock();
}
}
}

异步获取 rpc 请求的结果的方式就是这样,这里有个疑问是谁来负责唤醒当前的阻塞线程呢?这个就需要牵涉到 netty 的 channel handler 对于 rpc 结果的处理了,这里我们只需要知道如果 rpc 结果在有限时长内完成,那么当前线程会顺利的返回 rpc 的结果,如果超时就会进行重试,而负责唤醒当前阻塞线程的其它线程是跟 netty 有关的。

netty 通信客户端

上边说到了进行 rpc 请求的线程在通过 netty 发送 rpc 信息后,调用 get 方法会阻塞在 rpc 结果的获取方法上,这部分即对 rpc 信息的发送编码过程进行说明,这里需要先明确 rpc 的信息是由 Request 实例承载的,我这里通过 debug 的方式给出一个 Request 实例的样例。可以看到其中就两个字段,消息的类型和 RPCINFO,里边包括了请求的方法名、参数的类型、参数的值以及服务的全限定名,通过这几项值就可以唯一确定服务的具体实现了。

Request 内部数据实例

上文只是说了 NettyClient 的构建,但是具体的内容没有说明,其实关于 netty 客户端的创建,我们需要关心的就是 ChannelInitializer 对于 NioSocketChannel 的初始化过程,具体的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
bootstrap.handler(new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", new NettyDecoderHandler(getUrl())) // 指定 decoder -> InternalDecoder
.addLast("encoder", new NettyEncoderHandler(getUrl())) // 指定 encoder -> InternalEncoder
// 该处理器用于向服务器发送心跳消息
.addLast("client-idle-handler", new IdleStateHandler(HEARTBEAT_INTERVAL, 0, 0, TimeUnit.MILLISECONDS))
.addLast("heartbeat-handler", new HeartbeatHandler()) // 该处理器主要是对心跳消息进行处理
.addLast("handler", new ClientFinalChannelHandler(getUrl())); // 最后的 handler,就是核心的逻辑处理器
}
});

这里的 IdleStateHandler 和 HeartbeatHandler 是和心跳消息相关联的,感兴趣可以自行阅读,对于我们的 Request 实例的发送和响应的接收而言,只跟 另外三个 ChannelHandler 相关,因为我们现在是发送 Request 包,直接定位到 NettyEncoderHandler#encode 方法如下,至于为什么会调用该方法,这就和 netty 的 channel pipeline 上的 channel handler context 与 channel handler 有关了,如果对 netty 的源码不熟,就会很难理解,如果有时间我后边会写一些关于 netty 源码的文章。好了,我们这里是 Request,走 requestEncode(ctx, msg, out); 分支。

1
2
3
4
5
6
7
8
9
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
serializer = extensionSelector.serializer(url, out);
if (msg instanceof Request) {
requestEncode(ctx, msg, out);
} else if (msg instanceof Response) {
responseEncode(ctx, msg, out);
}
}

具体的代码如下,其实每一步我在注释中已经写得很详细了,我这里只要给出数据包的模型就能够很容易理解编码的过程,唯一要注意的是我们这里的 Request 是 RpcRequest,所以在进行数据包体的编码时,是从 Request 中获取到 data 即 RpcInfo 进行编码。

魔幻数字(0 - 1) 标志位(2) 保留位(3) 序列号(4 - 11) 数据包体长度(12 - 15)
占用 2 字节 占用 1 字节 占用 1 字节 占用 8 字节 占用 4 字节
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
private void requestEncode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws IOException {
Request request = (Request) msg;
if (logger.isDebugEnabled()) {
logger.debug("Encode request message of " + request);
}
// 构建请求头
byte[] header = new byte[HEADER_LENGTH];
// 填充魔幻数字
ByteUtils.fillShort(MAGIC, header, 0);
// 填充请求头的标志字节
header[2] = (byte) (REQUEST_FLAG | extensionSelector.getSerializerId(url));
// 填充请求 id
ByteUtils.fillLong(request.getId(), header, 4);
// 这是 header 的起始位置
int headerIndex = out.writerIndex();
// 将 writer 指针定位到 body 写入的起始位置
int bodyIndex = headerIndex + HEADER_LENGTH;
out.writerIndex(bodyIndex);
// 对于 rpc 请求的编码和事件消息的编码是不一样的
if (request.isEvent()) {
header[2] = (byte) (header[2] | EVENT_FLAG);
encodeEventRequest(request.getData());
} else {
encodeRpcRequest(request.getData());
}
serializer.flushBuffer();
int len = out.writerIndex() - bodyIndex;
// 填充 header 的长度信息
ByteUtils.fillInt(len, header, 12);
// 将位置定位到 header 起始位置
out.writerIndex(headerIndex);
// 写入头信息
out.writeBytes(header);
// 将指针定位到全部数据写入后应该在的位置
out.writerIndex(bodyIndex + len);
}

/**
* 将 rpc 相关的信息写入,不采用直接写入 object 的方式是为了最大的降低数据传输的量
*
* @param msg
* @throws IOException
*/
private void encodeRpcRequest(Object msg) throws IOException {
RpcInfo info = (RpcInfo) msg;
// 请求的服务的名字
serializer.writeUTF(info.getServiceName());
// 请求的方法名
serializer.writeUTF(info.getMethodName());
// 请求的方法参数类型(用签名的方式表示)
serializer.writeUTF(ReflectUtils.getDesc(info.getPts()));
// 方法调用的参数值
Object[] pvs = info.getPvs();
if (pvs != null) {
for (Object pv : pvs) {
serializer.writeObject(pv);
}
}
}

将编码后的结果发送给服务端后,服务端会进行解码,然后根据解码的结果完成对应的服务接口实现类方法的调用,这个稍后进行说明,方法调用的结果,服务端也是通过 netty 发送回来,同样的数据包也是经过编码的,客户端需要对其进行解码,才能得到具体的结果,这个过程是在 NettyDecoderHandler#decode 方法中完成的,至于为什么是在这个方法,那么又需要涉及到 netty 的源码实现了。在该方法中,核心就是调用 NettyDecoderHandler#prepareDecode 方法,如果能够理解编码的过程,那么解码就是一个逆向过程,结合我在代码中的注释应该就能理解,这里也不再赘述.需要注意的是 NettyDecoderHandler#prepareDecode 方法解码出来的结果会在 NettyDecoderHandler#decode 中添加到 List<Object> out 集合中,然后在父类中会逐个的将里边的元素取出并交由下一个 ChannelHandler 处理,这里也就是我们在创建 NettyClient 时指定的 ClientFinalChannelHandler。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
while (in.isReadable()) {
int readerIndex = in.readerIndex();
serializer = extensionSelector.deserializer(url, in);
int pre = count;
Object result = prepareDecode(ctx, in, out);
if (result == EMPTY_RESULT) {
in.readerIndex(readerIndex);
count = pre;
break;
} else {
int len = count - pre;
in.readerIndex(readerIndex + len);
out.add(result);
}
}
}

private Object prepareDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
int readable = in.readableBytes();
byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
in.readBytes(header);
// 如果魔幻头不一致,那么就需要重新定位到新的魔幻头
if ((readable > 0 && header[0] != MAGIC_LOW) || (readable > 1 && header[1] != MAGIC_HIGH)) {
int length = header.length;
if (header.length < readable) { // 这里说明够头部的长度
// 这里就说明,除了 header 的字节部分,还有真正的数据部分
header = ByteUtils.copyOf(header, readable);
// 这样就是将全部数据读到了 header 中,前 16 个为 header 部分,后边的为剩余数据
in.readBytes(header, length, readable - length); // 全部的数据都到了 header 中
}
for (int i = 1; i < header.length - 1; i++) {
if (header[i] == MAGIC_LOW && header[i + 1] == MAGIC_HIGH) { // 这里是重新确定魔幻头的位置
// 这里就说明又检测到了一个数据包,那么就将第一个数据包的内容放到 header 中
in.readerIndex(in.readerIndex() - header.length + i); // 将 readerIndex 重新定位到新的魔幻头位置
readable = in.readableBytes();
header = new byte[Math.min(readable, HEADER_LENGTH)];
in.readBytes(header);
break;
}
}
// 一次只解析一个数据包
}
// check length.
if (readable < HEADER_LENGTH) { // 接收到的数据还不足以解析出一个完整的数据包
return EMPTY_RESULT;
}

// get data length. 根据 header 获取 len 信息
int len = ByteUtils.readInt(header, 12);
// 总长度为 header + 数据长度(header 后四位进行记录)
int tt = len + HEADER_LENGTH;
if (readable < tt) { // 也就是说目前接受到的数据还不够 header + 包长度得到的总长度
return EMPTY_RESULT;
}
count += tt;
// limit input stream.通过 ChannelBufferInputStream 对 buffer 进行封装,记录了内容读取的上下界
// ChannelBufferInputStream -> NettyBackedCahnnelBuffer -> Netty 原生 buf
return doDecode(header); // 解码除开 header 以外的信息
}

private Object doDecode(byte[] header) {
byte flag = header[2], proto = (byte) (flag & SERIALIZER_MASK);
// get request id.
long id = ByteUtils.readLong(header, 4);
// 如果标志位是不是 request,那就说明是收到的 response
if ((flag & REQUEST_FLAG) == 0) {
// decode response.
Response res = new Response(id);
// get status.
byte status = header[3];
res.setStatus(status);
// 判断当前相应是否为事件响应
try {
try {
if ((flag & EVENT_FLAG) == 0) {
res.setType(MessageType.RESPONSE);
Object msg = serializer.readObject();
res.setData(msg);
} else {
res.setType(MessageType.HEARTBEAT_RESPONSE);
Object msg = serializer.readObject();
res.setData(msg);
}
} catch (Exception e) {
logger.error("Decode message error, please check provider and consumer use the same serializer");
}
} catch (Throwable e) {
e.printStackTrace();
}
return res;
}
}

清除 netty 原理的话就会知道在 ClientFinalChannelHandler 中接收上个 ChannelHandler 的处理结果的方法是 ClientFinalChannelHandler#channelRead,这个方法很简单,就只是向线程池中提交了一个 EventHandleThread 任务,它持有了解码出来的结果,那么接下来就是看这个 Task 对我们的解码结果做了什么处理。直接查看它的 run 方法。可以知道它就是直接调用 handleMsg();方法。因为我们收到的是服务端的 Response 数据包,所以只关注该方法的 else if (msg instanceof Response) 分支。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 处理接收到的消息,此处的处理怎么看都不够优雅
*
* @throws IllegalAccessException
* @throws InstantiationException
*/
private void handleMsg() throws IllegalAccessException, InstantiationException {
// 这里是对于非事件 request 的处理方式
else if (msg instanceof Response) {
Response response = (Response) msg;
if (logger.isDebugEnabled()) {
logger.debug("Received response message of " + response);
}
byte status = response.getStatus();
// 只对相应状态为 ok 的 response 进行处理
if (Response.OK == status) {
Object data = response.getData();
RpcResult result = getRpcResult(response);
if (result != null) {
// 完成相应结果的填充
result.fillData(data);
}
}
}
}

可以看到所做的事情就是从 Response 中拿到响应的数据,然后将其填充到 RpcResult 中,特别注意,这个 RpcResult 是和 rpc Request 的 id 对应的,所以填充的结果也是针对对应的 RpcResult,而在 RpcResult#fillData 方法中,就是完成了结果的填充,并唤醒了阻塞在当前 rpc 请求对应的线程,这样 rpc 请求的结果就被服务接口客户端代理返回给接口的方法调用者。

netty 通信服务端

在服务端的启动中,我们也只是说了创建了 NettyServer,但是具体的细节没有进行说明,现在就完成这一部分的解释。和客户端一样,我们需要关注的是 ChannelInitializer 对于 NioSocketChannel 的初始化,方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
 .childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// 从 url 中获取 idleTimeout 时长,如果 url 参数中没有指定,那么就直接使用三倍的 heartBeat 时长
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", new NettyDecoderHandler(getUrl())) // InternalDecoder
.addLast("encoder", new NettyEncoderHandler(getUrl())) // InternalEncoder
// 用于检测 channel 空闲状态,条件成立时关闭对应的 channel,相对的客户端的 Idle 处理器则用于发送心跳消息
.addLast("server-idle-handler", new IdleStateHandler(0, 0, IDLE_TIMEOUT, MILLISECONDS))
.addLast("heartbeat-handler", new HeartbeatHandler())
.addLast("final-handler", new ServerFinalChannelHandler(getUrl()));
}
});

上文已经说过了前四个 ChannelHandler 的作用及使用了,服务端对于他们的使用基本一致,所以也不再赘述,最后一个 ChannelHandler 和 NettyClient 的最后一个 ChannelHandler 的作用基本一致,都是向线程池中提交了一个 EventHandleThread,里边的方法是一样的,只是因为处理的是来自客户端的 Request 包,所以走的是 EventHandleThread#handleMsg 方法的 if (msg instanceof Request) 分支。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 这里是对于非事件 request 的处理方式
if (msg instanceof Request) {
Request request = (Request) msg;
if (logger.isDebugEnabled()) {
logger.debug("Received request message of " + request);
}
if (!request.isEvent()) {
// 根据 request 相关的内容构建 response
Response response = buildResponse(request);
Object data = request.getData();
// 如果 request 携带的数据为 RpcInfo,那么就根据其进行相应的 invoker 调用
if (data instanceof RpcInfo) {
Object result = doInvoke((RpcInfo) data);
// 将 invoke 的结果填充到 response 中
response.setData(result);
// 将响应结果写回
ctx.writeAndFlush(response);
}
}
}

/**
* 根据 RpcInfo 进行逻辑调用,获取调用的结果
*
* @param data
* @return
* @throws InstantiationException
* @throws IllegalAccessException
*/
private Object doInvoke(RpcInfo data) throws InstantiationException, IllegalAccessException {
RpcInfo info = data;
// 尝试从 BeehiveContext 中获取 invoker 实例
Invoker invoker = BeehiveContext.unsafeGet(info.getServiceName(), Invoker.class);
// 根据 url 信息获取 invoke target 实例
ServiceConfigBean serviceConfigBean = BeehiveContext.unsafeGet(UrlConstants.PROVIDER_MODEL, ServiceConfigBean.class);
Object target = serviceConfigBean.getRef();
// 进行真正的 invoke 操作
return invoker.invoke(info.createInvokeInfo(target));
}

在该分支中,首先是构建了一个 Response 实例,这个实例的 id 是和 Request 一致的,这样客户端收到该 Response 数据包后才知道对应哪个 RpcResult 实例。接下来,就是从收到的 Request 中拿到数据 RpcInfo,里边包含了 rpc 的服务名、方法名、参数类型签名、参数值。EventHandleThread#doInvoke 方法就是根据以上这些进行服务接口实现类方法的调用。可以看到它先从 BeehiveContext 中拿到了一个 Invoker 实例,其实它就是在服务端启动时创建的 Invoker 链。其最后一个节点就封装了服务接口的实现类。当我们进行 invoker 链的 invoke 方法调用时,最终就会触发服务接口的实现类对应方法的调用,返回的就是该方法的执行结果。拿到该结果后,就是将它填充到 Response 实例中,调用 tx.writeAndFlush(response); 将响应结果返回,这样一个完成的 rpc 过程就算完成了。

TODO-LIST

  • 底层通信框架的支持有待完善,比如 Mina(我没接触过)

  • 缺少一个服务的管理模块,框架当前提供了相应的 filter 接口,通过实现该接口即可对 rpc 请求进行拦截,由此来获取部分监控信息

  • SPI 拓展机制没有完成相应的组件筛选功能,比如 filter 接口实现类,框架没有提供基础的选择性获取方式

  • 异步消息处理的线程池的构建应该更加灵活,相应的拒绝执行策略有待完善

  • 负载均衡策略缺陷严重,应该需要根据实际的 provider 负载情况来动态的调整