简介 项目地址
beehive 是一款轻量级的 RPC 框架,通过 spring 容器来管理 bean,做到了对用户代码的零入侵,同时通过 spi 拓展机制,实现了自己的 ioc 容器,使得 beehive 能够很方便的对组件进行拓展。
功能特性
实现了 SPI 拓展机制,能够方便的进行组件的自定义和替换
提供了两种代理方式的支持(JDK 原生、Javassist)
底层通信采用 Netty 框架,保证稳定性和高效性
对 Zookeeper 注册中心的支持,能够自动的侦测服务的状态,同步进行更新
完成了对与 fastjson 和 hessian 两种序列化器的支持
整合 spring 容器,对用户代码零入侵,使用方便
在客户端实现了两种负载均衡策略的支持(随机选取,轮询选取)
使用方式 beehive 加入了对于 spring 容器的支持,使得它在使用过程中可以做到对用户代码的零入侵,使用方式和 dubbo 很类似,在服务端,只需要定义要发布的服务的接口类型,服务的实现类,以及注册中心的地址即可完成启动,其他的一些相关参数可以作为备选项,这里给出一个使用样例。
provider.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 <?xml version="1.0" encoding="UTF-8" ?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:beehive="https://www.aprilyolies.top/schema/beehive" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd https://www.aprilyolies.top/schema/beehive https://www.aprilyolies.top/schema/beehive.xsd" > <bean id="demoServiceImpl" class="top.aprilyolies.service.BeehiveServiceImpl" /> <bean id="userServiceImpl" class="top.aprilyolies.service.UserServiceImpl" /> <beehive:service id="demoService" service="top.aprilyolies.service.BeehiveService" ref="demoServiceImpl" proxy-factory="jdk" serializer="hessian" server-port="7442" /> <beehive:service id="userService" service="top.aprilyolies.service.UserService" ref="userServiceImpl" proxy-factory="javassist" serializer="hessian" server-port="7442" /> <beehive:registry id="registry" address="zookeeper://119.23.247.86:2181" /> </beans>
这是服务端的 spring 配置文件,其中 id=”demoServiceImpl” 和 id=”userServiceImpl” 的 bean 就是服务的实现类,而 <beehive:service/> 标签中的 service=”top.aprilyolies.service.BeehiveService” 就是待发布服务的接口类型,另外一个 <beehive:service/> 类似,最后 <beehive:registry/> 标签中定义的就是注册中心地址,目前仅支持 zookeeer。
说完了配置文件,再看看启动程序,很简单,就是一个典型的 spring 容器启动程序如下,不做过多说明。
1 2 3 4 5 6 public static void main (String[] args) throws IOException { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext ("provider.xml" ); context.start(); System.out.println("Provider started on thread " + Thread.currentThread().getName() + ".." ); System.in.read(); }
启动服务端程序后就是客户端程序了,要做的配置也非常简单,仅仅是通过 beehive 引入服务即可,具体的 rpc 过程对于用户来说是绝对透明的,内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 <?xml version="1.0" encoding="UTF-8" ?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:beehive="https://www.aprilyolies.top/schema/beehive" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd https://www.aprilyolies.top/schema/beehive https://www.aprilyolies.top/schema/beehive.xsd" > <beehive:registry id="registry" address="zookeeper://119.23.247.86:2181" /> <beehive:reference id="demoService" service="top.aprilyolies.service.BeehiveService" load-balance="poll" serializer="hessian" read-timeout="1000" retry-times="2" /> <beehive:reference id="userService" service="top.aprilyolies.service.UserService" load-balance="poll" serializer="hessian" read-timeout="1000" retry-times="2" /> </beans>
基本跟服务端的配置相似,注册中心的配置是一样的,只有引用服务的标签变成为 <beehive:reference/>,该标签的属性设置也跟服务端的服务发布标签有所不同,关于详细的属性说明请看下文。
客户端启动程序服务端一样简单,直接启动程序,查看输出结果即可,不做过多说明。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class Consumer { public static void main (String[] args) throws Exception { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext ("consumer.xml" ); context.start(); BeehiveService demoService = context.getBean("demoService" , BeehiveService.class); long start = System.currentTimeMillis(); for (int i = 0 ; i < 50000 ; i++) { String hello = demoService.say("world - " + i); System.out.println("result: " + hello); Thread.sleep(500 ); } System.out.println(System.currentTimeMillis() - start); } }
参数设置说明 <beehive:service/> 标签
id:为 bean 在 spring 容器中的唯一标识符
service:将要发布的服务,值为你想要发布的服务的全限定名
ref:你所发布服务的实现类,它的值为 spring <bean/> 标签的 id 值
proxy-factory:服务提供端代理类创建的方式,目前支持 javassist 和 jdk 可选
serializer:序列化器,默认为 fastjson,可选 hessian
server-port:服务启动的端口号,默认使用 7440,你也可以再启动程序时通过 -Dport=端口号 来指定,优先级最高
<beehive:reference/> 标签
id:为 bean 在 spring 容器中的唯一标识符
service:将要发布的服务,值为你想要发布的服务的全限定名
load-balance:负载均衡设置,由客户端实现,目前只支持 random 和 poll 两种方式
proxy-factory:服务提供端代理类创建的方式,目前支持 javassist 和 jdk 可选
serializer:序列化器,默认为 fastjson,可选 hessian
read-timeout:指定 rpc 结果读取超时时间,如果本次结果获取失败,将会重试
retry-times:指定重试次数,即 rpc 结果获取超时重试次数
<beehive:registry/> 标签
样例测试 基本测试 项目中提供了实例程序(位于 beehive-demo)模块下,通过 git clone 将工程拉取下来后,在根目录下输入如下指令进行安装。
mvn clean install -Dmaven.test.skip=true
因为需要用到注册中心,所以实例程序中注册中心的地址是我的阿里云服务器地址,正常情况下我会启动 zookeeper 服务,那么示例程序就会将服务注册到我的阿里云服务器的 zookeeper 上,当然你也可以在本机启动一个 zookeeper,然后修改 spring 配置文件中的注册中心地址。
启动服务器,如果你没有修改实例程序的配置文件,默认使用我阿里云的 zookeeper,输入如下指令:
java -jar beehive-demo/provider/target/provider-1.0-SNAPSHOT-jar-with-dependencies.jar
启动客户端,没有修改代码的情况下,会从注册中心获取服务信息,输入如下指令:
java -jar beehive-demo/consumer/target/consumer-1.0-SNAPSHOT-jar-with-dependencies.jar
如果过程没有错误的话,在控制台将会打印 rpc 的结果如下:
1 2 3 4 5 6 7 8 9 10 result: Jim say world - 0 from 192.168 .1 .105 , [ server id is No.1 ] result: Jim say world - 1 from 192.168 .1 .105 , [ server id is No.1 ] result: Jim say world - 2 from 192.168 .1 .105 , [ server id is No.1 ] result: Jim say world - 3 from 192.168 .1 .105 , [ server id is No.1 ] result: Jim say world - 4 from 192.168 .1 .105 , [ server id is No.1 ] result: Jim say world - 5 from 192.168 .1 .105 , [ server id is No.1 ] result: Jim say world - 6 from 192.168 .1 .105 , [ server id is No.1 ] result: Jim say world - 7 from 192.168 .1 .105 , [ server id is No.1 ] result: Jim say world - 8 from 192.168 .1 .105 , [ server id is No.1 ] result: Jim say world - 9 from 192.168 .1 .105 , [ server id is No.1 ]
或者启动多线程的 consumer,输入如下指令:
java -cp beehive-demo/consumer/target/consumer-1.0-SNAPSHOT-jar-with-dependencies.jar top.aprilyolies.consumer.MultiThreadConsumer
我这里是启动了 5 个线程来进行的 rpc 请求,参数附带了当前线程的信息,所以 rpc 的结果也包含了线程信息,结果:
1 2 3 4 5 6 7 8 9 10 11 result: Jim say MultiThreadConsumer-pool-thread-3 - 0 from 192.168 .1 .105 , [ server id is No.1 ] result: Jim say MultiThreadConsumer-pool-thread-3 - 1 from 192.168 .1 .105 , [ server id is No.1 ] result: Jim say MultiThreadConsumer-pool-thread-1 - 0 from 192.168 .1 .105 , [ server id is No.1 ] result: Jim say MultiThreadConsumer-pool-thread-4 - 0 from 192.168 .1 .105 , [ server id is No.1 ] result: Jim say MultiThreadConsumer-pool-thread-5 - 0 from 192.168 .1 .105 , [ server id is No.1 ] result: Jim say MultiThreadConsumer-pool-thread-2 - 0 from 192.168 .1 .105 , [ server id is No.1 ] result: Jim say MultiThreadConsumer-pool-thread-3 - 2 from 192.168 .1 .105 , [ server id is No.1 ] result: Jim say MultiThreadConsumer-pool-thread-1 - 1 from 192.168 .1 .105 , [ server id is No.1 ] result: Jim say MultiThreadConsumer-pool-thread-2 - 1 from 192.168 .1 .105 , [ server id is No.1 ] result: Jim say MultiThreadConsumer-pool-thread-5 - 1 from 192.168 .1 .105 , [ server id is No.1 ]
服务切换的测试 示例程序中提供了两个服务端程序,表示两个服务提供者,测试服务切换需要同时启动这两个程序,指令如下:
java -cp beehive-demo/provider/target/provider-1.0-SNAPSHOT-jar-with-dependencies.jar top.aprilyolies.provider.Provider
java -cp beehive-demo/provider/target/provider-1.0-SNAPSHOT-jar-with-dependencies.jar top.aprilyolies.provider.AnotherProvider
启动客户端,没有修改代码的情况下,会从注册中心获取服务信息,输入如下指令:
java -jar beehive-demo/consumer/target/consumer-1.0-SNAPSHOT-jar-with-dependencies.jar
因为测试程序中默认使用的是轮询负载均衡机制,所以客户端会从所有的服务提供者中逐个调用,得到的结果如下,根据最后的 server id 就能够分辨出来当前是请求的哪个服务提供者。
1 2 3 4 5 6 7 8 9 10 result: Jim say world - 0 from 192.168 .1 .105 , [ server id is No.1 ] result: Jim say world - 1 from 192.168 .1 .105 , [ server id is No.2 ] result: Jim say world - 2 from 192.168 .1 .105 , [ server id is No.1 ] result: Jim say world - 3 from 192.168 .1 .105 , [ server id is No.2 ] result: Jim say world - 4 from 192.168 .1 .105 , [ server id is No.1 ] result: Jim say world - 5 from 192.168 .1 .105 , [ server id is No.2 ] result: Jim say world - 6 from 192.168 .1 .105 , [ server id is No.1 ] result: Jim say world - 7 from 192.168 .1 .105 , [ server id is No.2 ] result: Jim say world - 8 from 192.168 .1 .105 , [ server id is No.1 ] result: Jim say world - 9 from 192.168 .1 .105 , [ server id is No.2 ]
或者启动多线程的 consumer,输入如下指令:
java -cp beehive-demo/consumer/target/consumer-1.0-SNAPSHOT-jar-with-dependencies.jar top.aprilyolies.consumer.MultiThreadConsumer
1 2 3 4 5 6 7 8 9 10 11 result: Jim say MultiThreadConsumer-pool-thread-2 - 0 from 192.168 .1 .105 , [ server id is No.1 ] result: Jim say MultiThreadConsumer-pool-thread-2 - 1 from 192.168 .1 .105 , [ server id is No.2 ] result: Jim say MultiThreadConsumer-pool-thread-4 - 0 from 192.168 .1 .105 , [ server id is No.1 ] result: Jim say MultiThreadConsumer-pool-thread-1 - 0 from 192.168 .1 .105 , [ server id is No.1 ] result: Jim say MultiThreadConsumer-pool-thread-5 - 0 from 192.168 .1 .105 , [ server id is No.1 ] result: Jim say MultiThreadConsumer-pool-thread-3 - 0 from 192.168 .1 .105 , [ server id is No.1 ] result: Jim say MultiThreadConsumer-pool-thread-2 - 2 from 192.168 .1 .105 , [ server id is No.1 ] result: Jim say MultiThreadConsumer-pool-thread-3 - 1 from 192.168 .1 .105 , [ server id is No.2 ] result: Jim say MultiThreadConsumer-pool-thread-5 - 1 from 192.168 .1 .105 , [ server id is No.2 ] result: Jim say MultiThreadConsumer-pool-thread-1 - 1 from 192.168 .1 .105 , [ server id is No.2 ] result: Jim say MultiThreadConsumer-pool-thread-4 - 1 from 192.168 .1 .105 , [ server id is No.2 ]
可以看到多线程的环境下,rpc 的调用逻辑和单线程是保持一致的。
实例程序默认是使用的轮询负载均衡机制,如果过程没错的话,那么你将会看到客户端会交替的从两个 provider 进行 rpc 调用。
尝试关掉其中一个 provider,客户端会侦测到这个变化,随即将这个下线的 provider 剔除,仅仅从剩下的 provider 中进行 rpc 调用。
再尝试重启这个 provider,客户端也会侦测到这个变化,随即将这个 provider 加入到可调用的 providers 列表中,进而进行 rpc 调用。
底层实现原理 标签解析 beehive 实现了自己的标签解析器,spring 容器能够对这些标签对应的 bean 的创建及初始化进行管理,beehive 自定义的标签只有 <beehive:registry/>、<beehive:service/>、<beehive:reference/> 三个,分别对应注册中心、服务发布、服务调用三个逻辑,相应的 BeanDefinitionParser 是由 top.aprilyolies.beehive.spring.namespace.BeehiveNamespaceHandler
完成注册的,它的代码内容如下,就是为每个 beehive 对应的标签注册了一个 BeanDefinitionParser,用于解析 spring 配置文件中的 beehive 标签。
1 2 3 4 5 6 7 8 9 10 11 public class BeehiveNamespaceHandler extends NamespaceHandlerSupport { @Override public void init () { registerBeanDefinitionParser("registry" , new RegistryBeanDefinitionParser ()); registerBeanDefinitionParser("service" , new ServiceBeanDefinitionParser ()); registerBeanDefinitionParser("reference" , new ReferenceBeanDefinitionParser ()); } }
因为三个 BeanDefinitionParser 的基本逻辑一样,我这里仅以 ServiceBeanDefinitionParser 进行大致说明,先贴出 ServiceBeanDefinitionParser 的源代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 public class ServiceBeanDefinitionParser extends AbstractBeanDefinitionParser { @Override public BeanDefinition parse (Element element, ParserContext parserContext) { RootBeanDefinition beanDefinition = new RootBeanDefinition (); beanDefinition.setBeanClass(ServiceProvider.class); beanDefinition.setLazyInit(false ); String id = element.getAttribute("id" ); String service = element.getAttribute("service" ); beanDefinition.getPropertyValues().addPropertyValue("service" , service); if (StringUtils.isEmpty(id)) { String name = element.getAttribute("name" ); beanDefinition.getPropertyValues().addPropertyValue("name" , name); if (!isExisted(parserContext, name)) { id = name; } else { String beanName = getBeanName(service); id = beanName; int count = 1 ; while (isExisted(parserContext, id)) { id = beanName + count; } } } if (!StringUtils.isEmpty(id)) { if (isExisted(parserContext, id)) { throw new IllegalStateException (String.format("Bean of id %s has existed" , id)); } beanDefinition.getPropertyValues().addPropertyValue("id" , id); parserContext.getRegistry().registerBeanDefinition(id, beanDefinition); } parseAttribute(element, beanDefinition, "protocol" , "beehive" ); parseAttribute(element, beanDefinition, "proxy-factory" , "javassist" ); parseAttribute(element, beanDefinition, "serializer" , "fastjson" ); parseAttribute(element, beanDefinition, "server-port" , "7440" ); String ref = element.getAttribute("ref" ); if (parserContext.getRegistry().containsBeanDefinition(ref)) { BeanDefinition refBean = parserContext.getRegistry().getBeanDefinition(ref); if (!refBean.isSingleton()) { throw new IllegalStateException ("The exported service ref " + ref + " must be singleton! Please set the " + ref + " bean scope to singleton, eg: <bean id=\"" + ref + "\" scope=\"singleton\" ...>" ); } } RuntimeBeanReference reference = new RuntimeBeanReference (ref); beanDefinition.getPropertyValues().addPropertyValue("ref" , reference); return beanDefinition; } }
这个类实现了 org.springframework.beans.factory.xml.BeanDefinitionParser
接口,他只有一个 BeanDefinition parse(Element element, ParserContext parserContext);
方法,参数 element 就是 spring 配置文件中 beehive 对应的那个标签元素,参数 parserContext 就是一个标签解析的上下文环境,可以从中获取到已经解析的标签的信息。在 parse 方法中我们就需要实现自己的标签解析逻辑,主要就是创建了一个 RootBeanDefinition 实例,然后将其和正在解析的标签所对应的类进行绑定,也就是 beanDefinition.setBeanClass(ServiceProvider.class);
这个方法,接下来就是将标签中解析出来的属性逐个的添加到刚刚创建的 RootBeanDefinition 实例中,这样 RootBeanDefinition 就包含了类信息及相关的参数信息,最后就是将 RootBeanDefinition 实例返回,交由 spring 容器进行管理,这包括 bean 是否是单例,不同的初始化阶段的声明周期方法的调用,字段的自动装配等等。
服务端启动 服务端启动主要是完成了三件事,打开数据通信服务器、创建服务接口服务端代理,向注册中心发布服务,执行的先后顺序如下图:
服务端启动核心逻辑
在完成标签的解析后,标签对应的 bean 就会交由 spring 容器管理,根据 bean 的实现接口的类型,spring 容器会在不同的阶段完成 bean 对应接口方法的调用,服务端的启动就是如此。先看看服务发布标签对应 bean(ServiceProvider)的实现接口 public class ServiceProvider extends ServiceConfigBean implements ApplicationListener<ContextRefreshedEvent>, InitializingBean, ApplicationContextAware
,对于 ApplicationListener 接口的实现类,spring 容器在初始化完成后进行该接口对应方法的调用,InitializingBean 接口则是在进行 bean 的初始化时进行调用,而 ApplicationContextAware 是一个感知接口,就是该接口的实现类能够拿到 spring 上下文环境,在 beehive 中,则是利用该接口拿到了 spring 上下文环境,通过它注册了一个关闭钩子函数,用于关闭通信端程序。具体如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 @Override public void setApplicationContext (ApplicationContext applicationContext) throws BeansException { this .applicationContext = applicationContext; if (applicationContext instanceof ConfigurableApplicationContext) { ((ConfigurableApplicationContext) applicationContext).registerShutdownHook(); } addApplicationListener(new ShutdownHookListener ()); } public class ShutdownHookListener implements ApplicationListener { @Override public void onApplicationEvent (ApplicationEvent event) { if (event instanceof ContextClosedEvent) { BeehiveShutdownHook.closeAll(); } } } public static void closeAll () { for (Registry registry : registries) { registry.close(); } for (EndPoint endPoint : endPoints) { endPoint.close(); } }
而实现 InitializingBean 接口主要是为了注入 registry 对应的 bean 如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @Override public void afterPropertiesSet () throws Exception { checkRegistry(); } private void checkRegistry () { RegistryConfigBean registry = getRegistry(); if (registry == null ) { if (applicationContext != null ) { Map<String, RegistryConfigBean> registryMap = BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, RegistryConfigBean.class, false , false ); if (registryMap.size() > 0 ) { Collection<RegistryConfigBean> registries = registryMap.values(); for (RegistryConfigBean reg : registries) { setRegistry(reg); break ; } } } } }
服务端启动的主要逻辑是在 ApplicationListener 接口对应的方法中完成的,该方法中直接是调用了 exportService();
方法,该方法中主要是两个部分,准备 registryUrls 和进行真正的服务注册,准备 registryUrls 就只是一个信息拼凑的过程,逻辑简单,只是过程比较繁琐,感兴趣的话可以自行去看源码 ,最终的拼凑结果我通过 debug 的方式显示出来了,请看下图。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Override public void onApplicationEvent (ContextRefreshedEvent event) { exportService(); } public void exportService () { if (!published) { synchronized (publishMonitor) { if (!published) { if (logger.isDebugEnabled()) { logger.debug("Export service via thread " + Thread.currentThread().getName()); } BeehiveContext.unsafePut(UrlConstants.PROVIDER_MODEL, this ); List<URL> registryUrls = getRegistryUrl(getRegistry()); fillParameters(registryUrls, this ); checkRegistryUrls(registryUrls); registryService(registryUrls); } published = true ; } } }
registryUrls的内容
这里重点关注第二部分进行真正的服务注册方法 registryService(registryUrls);
,它根本是调用的 AbstractRegistry#registry
方法,在该方法中主要就是完成了服务端启动的三个核心逻辑,从其方法名就可以知道了。
1 2 3 4 5 6 7 8 9 10 11 12 @Override public void registry (URL url) { if (url == null ) throw new IllegalArgumentException ("Can't publish a service for null url" ); try { openServer(url); createInvoker(url); doPublish(url); } catch (Exception e) { logger.error("publish service failed" , e.getCause()); } }
先看 openServer(url);
方法,它会根据 url 确定当前终端是服务端还是客户端,因为我们现在是分析的服务端启动,所以走的是上边那条逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 @Override protected void openServer (URL url) { String protocol = url.getOriginUrl().getParameterElseDefault(UrlConstants.SERVICE_PROTOCOL, UrlConstants.DEFAULT_SERVICE_PROTOCOL); URL serviceUrl = URL.copyFromUrl(url.getOriginUrl()); serviceUrl.setOriginUrl(url.getOriginUrl()); serviceUrl.setProtocol(protocol); if (url.isProvider()) { protocolSelector.publish(serviceUrl); } else if (!url.isProvider()) { protocolSelector.subscribe(serviceUrl); } }
该方法的实现是这样的,优先从缓存中获取 server 实例,如果没有的话就新建一个 server 实例并进行缓存,server 实例的创建是在 NettyTransporter#bind 方法中完成的,可以看到这里创建的是一个 NettyServer,也就是说 beehive 的底层是采用的 netty 作为通信框架的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Override public Server bind (URL url) { String serverKey = buildServerKey(url); Server server = serverCache.get(serverKey); if (server == null ) { synchronized (serverCache) { if (serverCache.get(serverKey) == null ) { server = new NettyServer (url); AbstractConfig.BeehiveShutdownHook.addEndPoint(server); serverCache.putIfAbsent(serverKey, server); } } } return server; }
知道 openServer(url);
的实现后,再看 createInvoker(url);
方法,它就是一个创建服务接口代理类的过程,考虑到拓展性,所以 beehive 的处理过程还将创建出来的代理进行了封装,最终返回的是一个 Invoker 链,我们先看看根本的代理类的创建过程,它的实现是在 AbstractProxyFactory#createInvoker
方法中,里边有两个逻辑,创建原生的代理类,和将代理类封装成为 Invoker 实现类,这样做是为了方便后边的 Invoker 链的构建。beehive 提供了两种代理类生成的方式,这里只是为了说明代理的逻辑,所以选择稍微简单的 jdk 原生代理的方式进行说明,而 javassist 方式生成代理的过程可以自行参考源码理解。
jdk 原生代理类生成的入口方法为 JdkProxyFactory#createProxy
,我们是服务端代理类生成,走上一个分支,可以看到它根本是调用的 ProviderProxy.getJdkProxy(target, clazz, Proxy.class);
方法,其中 target 就是我们在 spring 配置文件中指定的服务接口的实现类,class 是服务接口对应的 class,而 Proxy.class 则是返回的接口类型,也就是说我们生成的代理类是实现了两个接口的,服务接口和 Proxy 接口,且返回的表现形式是 Proxy。
1 2 3 4 5 6 7 8 9 10 11 12 @Override protected Proxy createProxy (Class<?> clazz, URL url) { if (url.isProvider()) { ServiceConfigBean serviceConfigBean = BeehiveContext.unsafeGet(UrlConstants.PROVIDER_MODEL, ServiceConfigBean.class); Object target = serviceConfigBean.getRef(); return ProviderProxy.getJdkProxy(target, clazz, Proxy.class); } else { Invoker invoker = getInvoker(url); return ConsumerProxy.getJdkProxy(new InvokerInvocationHandler (invoker, url), clazz, Proxy.class); } }
知道这一点后,我们再直接看 jdk 代理类的构建细节,也就是如下代码所示的那样,可以看到代理的逻辑是由 ProviderInvocationHandler 指定的,它持有了我们在 spring 配置文件中指定的服务接口的实现类,当代理类需要执行某个方法时,真正调用的其实是服务接口实现类的方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public static Proxy getJdkProxy (Object target, Class<?>... classes) { if (classes.length < 1 ) { throw new IllegalStateException ("Can't create jdk proxy for none of interface has specified" ); } ClassLoader classLoader = classes[0 ].getClassLoader(); ProviderInvocationHandler handler = new ProviderInvocationHandler (target); return (Proxy) java.lang.reflect.Proxy.newProxyInstance(classLoader, classes, handler); } private static class ProviderInvocationHandler implements InvocationHandler { private final Object target; public ProviderInvocationHandler (Object target) { this .target = target; } @Override public Object invoke (Object proxy, Method method, Object[] args) throws Throwable { return method.invoke(target, args); } }
返回的 jdk 代理是是被封装成了 ProxyWrapperInvoker,beehive 接下来就是根据该 invoker 构建 invoker 链,至于 invoker 链的其他节点,则是通过 spi 拓展机制来指定的,但是关于 invoker 逻辑功能,我暂时是做的空实现,所以采用的是硬编码的方式,如果以后有新的需求了,就会考虑将其实现改为适配 spi 拓展,关乎 invoker 链,这里采用的是责任链模式,具体的构建过程如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public static Invoker buildInvokerChain (Invoker<?> invoker) { List<Filter> filters = new ArrayList <>(); filters.add(new AccessLogFilter ()); filters.add(new MonitorFilter ()); Invoker ptr = invoker; if (filters.size() > 0 ) { for (Filter filter : filters) { final Invoker next = ptr; Invoker pre = new AbstractInvoker () { @Override protected Object doInvoke (InvokeInfo info) { return filter.doFilter(next, info); } }; ptr = pre; } } return ptr; }
可以看到封装我们代理类的 ProxyWrapperInvoker 节点位于 invoker 链的末尾,而最终返回的是 invoker 链的头结点,这样在进行逻辑调用时,就会先执行拓展逻辑,最后再执行我们的代理逻辑。
最后一个就是 doPublish(url);
方法,它做的事情很简单,就是向注册中心也就是 zookeeper 中写入当前发布的服务的相关信息,这里的 zookeeper 客户端使用的 curator 框架,具体的过程很简单,这里只给出写入服务信息后的结果。
zookeeper 中写入的服务信息
执行到这里,服务端的启动计算完成了,这个启动过程主要就是做了三件事情,启动数据交互的服务终端,创建逻辑调用的代理类,将其封装为 Invoker 链,之后就是向注册中心发布服务,接下来继续读客户端的启动进行说明。
客户端启动 在了解过服务端的启动之后,再看客户端的启动流程就会很简单,跟客户端一样,它主要也是实现了三个逻辑,在这三个逻辑中,就只有代理类的创建比较复杂,而在数据交互的终端创建和消费者信息注册方面,过程基本一致,所以这里主要就是对代理类的创建方式进行说明。
客户端启动核心逻辑
首先还是看和服务调用相关的标签 <beehive:reference/> 对应的 bean(ServiceConsumer)所实现的接口,相比于服务端,客户端实现的接口就只有一个 FactoryBean,它指定了三个方法,根据 spring 的实现规则,知道在获取实现 FactoryBean 接口的 bean 时,它实际返回的是 getObject()
方法返回的实例。根据这个特性,所以 beehive 客户端的服务接口代理类就是通过该方法返回的,我们重点关注这个方法。
1 2 3 4 5 public interface FactoryBean <T> { T getObject () throws Exception; Class<?> getObjectType(); boolean isSingleton () ; }
getObject()
方法和服务端的 ServiceProvider#onApplicationEvent
方法很相似,分为准备 registryUrls 信息和订阅 registryUrl 两部分(客户端的订阅和服务端的发布相对应),准备 registryUrls 信息的过程就是个简单但是比较繁琐的过程,可以自行看源码了解,而订阅 registryUrl 的逻辑根本就是调用的 AbstractRegistry#registry
方法,它跟服务端调用的是同样的方法。
1 2 3 4 5 6 7 8 9 10 11 12 @Override public void registry (URL url) { if (url == null ) throw new IllegalArgumentException ("Can't publish a service for null url" ); try { openServer(url); createInvoker(url); doPublish(url); } catch (Exception e) { logger.error("publish service failed" , e.getCause()); } }
所以客户端启动做的三个核心逻辑和服务端是一样的,只是具体的实现不一样,先看 openServer(url);
,它根本是调用的 NettyTransporter#connect
方法,可以看到这里我们创建的是 NettyClient,这和服务端的 NettyServer 对应。
1 2 3 4 5 6 @Override public Client connect (URL url) { NettyClient nettyClient = new NettyClient (url); AbstractConfig.BeehiveShutdownHook.addEndPoint(nettyClient); return nettyClient; }
再看 createInvoker(url);
方法的实现,我们这里启动客户端,所以走的是 else 分支,首先代码是从 zookeeper 注册中心中获取已经发布的服务信息,也就是服务端所发布的服务信息。接着调用 ZookeeperRegistry#addProviderRefreshListener
方法,其实就是通过 curator 框架向 zookeeper 注册了一个监听器,它会监听服务上下线的信息,以此来达到更新 provider 列表的目的,关键的代码如下,添加的 listener 为 ProviderRefreshListener,可以看到它监听 CHILD_ADDED、CHILD_REMOVED、CHILD_UPDATED 三种事件,所作出的动作是 BeehiveContext.unsafePut(PROVIDERS, providerUrls);
更新 provider 列表。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 private void addProviderRefreshListener (String providerPath) { try { PathChildrenCache pathCache = new PathChildrenCache (zkClient, providerPath, true ); pathCache.start(); pathCache.getListenable().addListener(new ProviderRefreshListener (pathCache)); } catch (Exception e) { logger.error("Add provider refresh listener failed, the target path was " + providerPath + ", the curator client was " + zkClient); e.printStackTrace(); } } private class ProviderRefreshListener implements PathChildrenCacheListener { private final PathChildrenCache pathCache; public ProviderRefreshListener (PathChildrenCache pathCache) { this .pathCache = pathCache; } @Override public void childEvent (CuratorFramework client, PathChildrenCacheEvent event) throws Exception { PathChildrenCacheEvent.Type eventType = event.getType(); switch (eventType) { case CHILD_ADDED: case CHILD_REMOVED: case CHILD_UPDATED: { List<ChildData> currentData = pathCache.getCurrentData(); List<String> providerUrls = new ArrayList <>(currentData.size()); for (ChildData data : currentData) { providerUrls.add(data.getPath()); } BeehiveContext.unsafePut(PROVIDERS, providerUrls); } } } }
接着就是创建服务接口的客户端代理,这里以 jdk 的客户端代理进行说明,还记得在服务端创建 jdk 代理时有获取一个代理 target 实例吗?它就是服务接口的实现类,完成了具体提服务逻辑。但现在我们是在创建客户端的代理,那么这里的代理 target 实例是什么呢?其实可以先试想一下,我们客户端在调用接口方法时,代理类应该完成啥样的逻辑呢?
beehive 这里的实现其实就是将被调用的接口方法和参数相关信息发送到服务端,服务端再根据收到的方法及参数信息完成真正的服务接口实现类的逻辑调用,最终再将处理的结果返回给客户端代理,最终代理将结果返回给方法调用者,完成这个发送方法及参数信息给服务端的类就是这里应该拿到的 target 实例,通过 debug,可以知道这个实例其实就是 FailoverClusterInvoker。它里边的实现逻辑不是这里应该关注的点,我们只需要知道,客户端的服务接口代理根本是调用的 FailoverClusterInvoker 实例的方法。知道这个后,jdk 代理类的创建就跟服务端一样了,具体到代码就是下边这一段,getInvoker(url)
拿到的是 FailoverClusterInvoker,它将被作为 target 而创建服务端接口代理类。
top.aprilyolies.beehive.proxy.JdkProxyFactory#createProxy
1 2 Invoker invoker = getInvoker(url);return ConsumerProxy.getJdkProxy(new InvokerInvocationHandler (invoker, url), clazz, Proxy.class);
完成客户端服务接口代理类创建的说明,就只剩下 doPublish(url);
,跟服务端一样就是向注册中心写入自身的信息,使用的是 curator 框架,过程很简单,不做细说,只给出写入的结果。
zookeeper 中写入的消费者信息
客户端 RPC 请求流程 当完成客户端和服务端的启动后,客户端就可以进行 rpc 请求了,还记的上文指出的 FailoverClusterInvoker 实例吗?它是客户端代理类真正逻辑的起点。从类名可以知道它是一个 Invoker 类,自然是实现了 Invoker 接口,这样我们关注的也就是该接口对应的 Invoker#invoke
方法,它的实现是在父类 AbstractInvoker 之中,该方法又是调用了 AbstractInvoker#doInvoke
方法,子类负责实现该方法的逻辑,这里是典型的模板方法设计模式。
我先把 FailoverClusterInvoker#doInvoke
方法贴出来,第一个 if 判断实现看看缓存中是否有 invokers 信息,如果没有就会执行 listInvokers 方法来列出可用的 invokers。
top.aprilyolies.beehive.invoker.FailoverClusterInvoker#doInvoke
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 @Override protected Object doInvoke (InvokeInfo info) { try { if (this .invokers == null || registry == null ) { synchronized (this ) { if (invokers == null || registry == null ) { invokers = listInvokers(); Registry registry = BeehiveContext.unsafeGet(REGISTRIES, Registry.class); addInvokersRefreshListener(this .registry); this .registry = registry; } } } if (needUpdateInvokers) { updateInvokers(); } int reInvokeCount = retryCountThreadLocal.get(); LoadBalance loadBalance = loadBalanceThreadLocal.get(); if (loadBalance == null ) { loadBalance = createLoadBalance(url); loadBalanceThreadLocal.set(loadBalance); } Invoker<T> invoker = selectInvoker(loadBalance, invokers); if (invoker != null ) { Invoker chain = buildInvokerChain(invoker); Object result = chain.invoke(info); if (result == null ) { needUpdateInvokers = true ; if (reInvokeCount++ < MAX_REINVOKE_TIMES) { retryCountThreadLocal.set(reInvokeCount); return doInvoke(info); } else { throw new IllegalStateException ("Do invoke " + MAX_REINVOKE_TIMES + " times, but the result was still null" ); } } else return result; } else { throw new IllegalStateException ("There is none of service provider could be use, please check your " + "registry center that if there is any service has published." ); } } finally { retryCountThreadLocal.set(0 ); } }
如下就是列出可用 invokers 信息的方法,首先是拿到注册中心上发布的服务信息的集合,在服务端启动时,会首先获取一次该集合并缓存到 BeehiveContext 中,如果后续发生了服务信息的更新即服务的上下线等,客户端启动时注册的监听器会侦测该变化,并刷新服务信息集合。接着就是过滤这些服务信息,因为即便是相同的服务,可能会由于发布服务时使用的编解码方式与客户端使用的编解码方式不一致导致服务不能直接访问,这样该服务也就应该被过滤掉。过滤的条件可以更加细化,如果你有好的建议也可以提交 PR,我看到后会进行处理。完成服务信息的过滤后接着就是获取当前服务对应的通信终端,最后将获取的服务信息和通信终端构建为 RemoteInvoker 返回。
top.aprilyolies.beehive.invoker.FailoverClusterInvoker#listInvokers
1 2 3 4 5 6 7 8 9 10 private synchronized List<Invoker<T>> listInvokers () { Map<String, List<String>> providersMap = BeehiveContext.unsafeGet(UrlConstants.PROVIDERS, Map.class); List<String> providers = providersMap.get(url.getParameter(UrlConstants.SERVICE)); providers = filterProviders(providers); this .providers = providers; Client client = BeehiveContext.unsafeGet(UrlConstants.CONSUMERS_TRANSPORT, Client.class); assert providers != null ; return createRemoteInvoker(providers, client); }
FailoverClusterInvoker#doInvoke
的第二个 if 条件是判断是否服务发生了更新,如果发生了更新就需要刷新 invokers 列表,刷新的过程和列出 invokers 列表的逻辑相似,只是这其中涉及到一个集合操作,即剔除下线的服务,新增上线的服务,感兴趣的话可以自己看源代码。
接下来的代码就是从列出的 invokers 列表中选择一个 invoker 构建 invoker 链,然后进行 invoke 调用,选择 invoker 的过程我将其描述为一个负载均衡的过程,默认提供了两种负载均衡策略,随机选取和轮询选取,具体的由 PollLoadBalance 和 RandomLoadBalance 两个类来负责实现,代码也很简单,贴出来自行阅读即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public class RandomLoadBalance extends AbstractLoadBalance { private static final Random random = new Random (System.currentTimeMillis()); @Override public <T> Invoker<T> select (List<Invoker<T>> invokers) { try { if (invokers == null || invokers.size() == 0 ) { return null ; } int idx = Math.abs(random.nextInt() % invokers.size()); if (logger.isDebugEnabled()) { logger.debug("There are " + invokers.size() + " invokers, RandomLoadBalance choose the invoker with" + "index of " + idx); } return invokers.get(idx); } catch (Exception e) { logger.error("Got invoker failed, this may caused by some new provider was added, and the beehive" + " was refresh the invokers list" ); return null ; } } } public class PollLoadBalance extends AbstractLoadBalance { private int idx = 0 ; @Override public <T> Invoker<T> select (List<Invoker<T>> invokers) { int size = invokers.size(); if (idx >= size) { idx = 0 ; } return invokers.get(idx++); } }
FailoverClusterInvoker#selectInvoker
方法就是通过上述的两个负载均衡器选择一个合适的 invoker 的方法,拿到这个 invoker 后就是构建 invoker 链,这个过程和服务端启动的 invoker 链构建过程一样,这里不再赘述。接着就是调用 invoker 链的 invoke 方法,根据之前的分析我们知道 invoker 链的尾节点就是我们通过负载均衡器选择出来的那个 invoker,也就是我们在 FailoverClusterInvoker#listInvokers
方法中列出的众多 invokers 中的一个,他们都是 RemoteInvoker 实例,所以我们接下来分析该 invoker 的 invoke 方法。
同样的 RemoteInvoker 是继承自 AbstractInvoker 父类,我们关注的是 doInvoke 方法的实现。这里还是先把代码贴出来,考虑到篇幅问题,我这里剃掉了部分空指针判断条件和失败重试代码,只留下了核心代码。简化后的代码可以很清楚的看到核心逻辑就三个,连接服务端,发送 rpc 请求信息,获取响应结果。连接服务端和发送 rpc 请求和 netty 的使用有关,如果不熟悉 netty 需要先自行了解 netty 的使用,本文这里不细述。而这里获取响应的结果是采用的异步的方式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 @Override protected Object doInvoke (InvokeInfo info) { try { Map<String, Channel> channelMap = addressChannel.get(); Channel ch = channelMap.get(channelKey); if (ch == null || !ch.isOpen() || !ch.isActive()) { ch = connectServer(); } Request request = buildRequest(info); ch.writeAndFlush(request); Object result = getResponse(request); String retryTimes = this .url.getParameter(UrlConstants.RETRY_TIMES); int times = RETRY_TIMES; if (!StringUtils.isEmpty(retryTimes)) { try { times = Integer.parseInt(retryTimes); } catch (NumberFormatException e) { } } return result; } catch (Exception e) { return null ; } }
获取 rpc 结果的入口方法是 RemoteInvoker#getResponse
,这里需要明确的一点是每一个 rpc 请求都有它所唯一对应的 id 来标志,rpc 请求结果的异步获取利用了这个特性。进行当前 rpc 请求的线程将本次请求的 id 和一个 RpcResult 形成映射缓存到 BeehiveContext,然后当前线程调用该 RpcResult 的 get 方法来获取结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 private Object getResponse (Request request) { String sid = String.valueOf(request.getId()); BeehiveContext.unsafePut(sid, new RpcResult ()); Object res = null ; try { String timeout = this .url.getParameter(UrlConstants.READ_TIMEOUT); if (StringUtils.isEmpty(timeout)) { res = BeehiveContext.unsafeGet(sid, RpcResult.class).get(); } else { try { int time = Integer.parseInt(timeout); res = BeehiveContext.unsafeGet(sid, RpcResult.class).get(time); } catch (NumberFormatException e) { logger.warn("The timeout parameter " + timeout + " is wrong, use the default timeout 2000ms" ); res = BeehiveContext.unsafeGet(sid, RpcResult.class).get(); } } } catch (Throwable t) { } BeehiveContext.unsafeRemove(sid); return res; }
下边列出了 RpcResult 的 get 方法,可以看到这里使用了 Condition 来完成线程之间的通信。如果 rpc 请求过程的完成标志 finished 为 false,进行 get 方法的当前线程会进行限时阻塞。如果在阻塞的过程中 rpc 过程完成,当前被阻塞的线程会被唤醒继续之前的逻辑。如果发生超时,那么之前的代码会根据情况进行请求的重试。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public Object get (int timeout) { if (finished) { return this .msg; } else { try { lock.lock(); if (!finished) { finishCondition.await(timeout, TimeUnit.MILLISECONDS); if (!finished) { throw new RuntimeException ("Get result was timeout" ); } else { return this .msg; } } return this .msg; } catch (InterruptedException e) { logger.error("Get result was interrupted" ); throw new RuntimeException ("Get result was interrupted" ); } finally { lock.unlock(); } } }
异步获取 rpc 请求的结果的方式就是这样,这里有个疑问是谁来负责唤醒当前的阻塞线程呢?这个就需要牵涉到 netty 的 channel handler 对于 rpc 结果的处理了,这里我们只需要知道如果 rpc 结果在有限时长内完成,那么当前线程会顺利的返回 rpc 的结果,如果超时就会进行重试,而负责唤醒当前阻塞线程的其它线程是跟 netty 有关的。
netty 通信客户端 上边说到了进行 rpc 请求的线程在通过 netty 发送 rpc 信息后,调用 get 方法会阻塞在 rpc 结果的获取方法上,这部分即对 rpc 信息的发送编码过程进行说明,这里需要先明确 rpc 的信息是由 Request 实例承载的,我这里通过 debug 的方式给出一个 Request 实例的样例。可以看到其中就两个字段,消息的类型和 RPCINFO,里边包括了请求的方法名、参数的类型、参数的值以及服务的全限定名,通过这几项值就可以唯一确定服务的具体实现了。
Request 内部数据实例
上文只是说了 NettyClient 的构建,但是具体的内容没有说明,其实关于 netty 客户端的创建,我们需要关心的就是 ChannelInitializer 对于 NioSocketChannel 的初始化过程,具体的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 bootstrap.handler(new ChannelInitializer () { @Override protected void initChannel (Channel ch) throws Exception { ch.pipeline() .addLast("decoder" , new NettyDecoderHandler (getUrl())) .addLast("encoder" , new NettyEncoderHandler (getUrl())) .addLast("client-idle-handler" , new IdleStateHandler (HEARTBEAT_INTERVAL, 0 , 0 , TimeUnit.MILLISECONDS)) .addLast("heartbeat-handler" , new HeartbeatHandler ()) .addLast("handler" , new ClientFinalChannelHandler (getUrl())); } });
这里的 IdleStateHandler 和 HeartbeatHandler 是和心跳消息相关联的,感兴趣可以自行阅读,对于我们的 Request 实例的发送和响应的接收而言,只跟 另外三个 ChannelHandler 相关,因为我们现在是发送 Request 包,直接定位到 NettyEncoderHandler#encode
方法如下,至于为什么会调用该方法,这就和 netty 的 channel pipeline 上的 channel handler context 与 channel handler 有关了,如果对 netty 的源码不熟,就会很难理解,如果有时间我后边会写一些关于 netty 源码的文章。好了,我们这里是 Request,走 requestEncode(ctx, msg, out);
分支。
1 2 3 4 5 6 7 8 9 @Override protected void encode (ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { serializer = extensionSelector.serializer(url, out); if (msg instanceof Request) { requestEncode(ctx, msg, out); } else if (msg instanceof Response) { responseEncode(ctx, msg, out); } }
具体的代码如下,其实每一步我在注释中已经写得很详细了,我这里只要给出数据包的模型就能够很容易理解编码的过程,唯一要注意的是我们这里的 Request 是 RpcRequest,所以在进行数据包体的编码时,是从 Request 中获取到 data 即 RpcInfo 进行编码。
魔幻数字(0 - 1)
标志位(2)
保留位(3)
序列号(4 - 11)
数据包体长度(12 - 15)
占用 2 字节
占用 1 字节
占用 1 字节
占用 8 字节
占用 4 字节
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 private void requestEncode (ChannelHandlerContext ctx, Object msg, ByteBuf out) throws IOException { Request request = (Request) msg; if (logger.isDebugEnabled()) { logger.debug("Encode request message of " + request); } byte [] header = new byte [HEADER_LENGTH]; ByteUtils.fillShort(MAGIC, header, 0 ); header[2 ] = (byte ) (REQUEST_FLAG | extensionSelector.getSerializerId(url)); ByteUtils.fillLong(request.getId(), header, 4 ); int headerIndex = out.writerIndex(); int bodyIndex = headerIndex + HEADER_LENGTH; out.writerIndex(bodyIndex); if (request.isEvent()) { header[2 ] = (byte ) (header[2 ] | EVENT_FLAG); encodeEventRequest(request.getData()); } else { encodeRpcRequest(request.getData()); } serializer.flushBuffer(); int len = out.writerIndex() - bodyIndex; ByteUtils.fillInt(len, header, 12 ); out.writerIndex(headerIndex); out.writeBytes(header); out.writerIndex(bodyIndex + len); } private void encodeRpcRequest (Object msg) throws IOException { RpcInfo info = (RpcInfo) msg; serializer.writeUTF(info.getServiceName()); serializer.writeUTF(info.getMethodName()); serializer.writeUTF(ReflectUtils.getDesc(info.getPts())); Object[] pvs = info.getPvs(); if (pvs != null ) { for (Object pv : pvs) { serializer.writeObject(pv); } } }
将编码后的结果发送给服务端后,服务端会进行解码,然后根据解码的结果完成对应的服务接口实现类方法的调用,这个稍后进行说明,方法调用的结果,服务端也是通过 netty 发送回来,同样的数据包也是经过编码的,客户端需要对其进行解码,才能得到具体的结果,这个过程是在 NettyDecoderHandler#decode
方法中完成的,至于为什么是在这个方法,那么又需要涉及到 netty 的源码实现了。在该方法中,核心就是调用 NettyDecoderHandler#prepareDecode
方法,如果能够理解编码的过程,那么解码就是一个逆向过程,结合我在代码中的注释应该就能理解,这里也不再赘述.需要注意的是 NettyDecoderHandler#prepareDecode
方法解码出来的结果会在 NettyDecoderHandler#decode
中添加到 List<Object> out 集合中,然后在父类中会逐个的将里边的元素取出并交由下一个 ChannelHandler 处理,这里也就是我们在创建 NettyClient 时指定的 ClientFinalChannelHandler。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 @Override protected void decode (ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { while (in.isReadable()) { int readerIndex = in.readerIndex(); serializer = extensionSelector.deserializer(url, in); int pre = count; Object result = prepareDecode(ctx, in, out); if (result == EMPTY_RESULT) { in.readerIndex(readerIndex); count = pre; break ; } else { int len = count - pre; in.readerIndex(readerIndex + len); out.add(result); } } } private Object prepareDecode (ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { int readable = in.readableBytes(); byte [] header = new byte [Math.min(readable, HEADER_LENGTH)]; in.readBytes(header); if ((readable > 0 && header[0 ] != MAGIC_LOW) || (readable > 1 && header[1 ] != MAGIC_HIGH)) { int length = header.length; if (header.length < readable) { header = ByteUtils.copyOf(header, readable); in.readBytes(header, length, readable - length); } for (int i = 1 ; i < header.length - 1 ; i++) { if (header[i] == MAGIC_LOW && header[i + 1 ] == MAGIC_HIGH) { in.readerIndex(in.readerIndex() - header.length + i); readable = in.readableBytes(); header = new byte [Math.min(readable, HEADER_LENGTH)]; in.readBytes(header); break ; } } } if (readable < HEADER_LENGTH) { return EMPTY_RESULT; } int len = ByteUtils.readInt(header, 12 ); int tt = len + HEADER_LENGTH; if (readable < tt) { return EMPTY_RESULT; } count += tt; return doDecode(header); } private Object doDecode (byte [] header) { byte flag = header[2 ], proto = (byte ) (flag & SERIALIZER_MASK); long id = ByteUtils.readLong(header, 4 ); if ((flag & REQUEST_FLAG) == 0 ) { Response res = new Response (id); byte status = header[3 ]; res.setStatus(status); try { try { if ((flag & EVENT_FLAG) == 0 ) { res.setType(MessageType.RESPONSE); Object msg = serializer.readObject(); res.setData(msg); } else { res.setType(MessageType.HEARTBEAT_RESPONSE); Object msg = serializer.readObject(); res.setData(msg); } } catch (Exception e) { logger.error("Decode message error, please check provider and consumer use the same serializer" ); } } catch (Throwable e) { e.printStackTrace(); } return res; } }
清除 netty 原理的话就会知道在 ClientFinalChannelHandler 中接收上个 ChannelHandler 的处理结果的方法是 ClientFinalChannelHandler#channelRead
,这个方法很简单,就只是向线程池中提交了一个 EventHandleThread 任务,它持有了解码出来的结果,那么接下来就是看这个 Task 对我们的解码结果做了什么处理。直接查看它的 run 方法。可以知道它就是直接调用 handleMsg();
方法。因为我们收到的是服务端的 Response 数据包,所以只关注该方法的 else if (msg instanceof Response)
分支。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 private void handleMsg () throws IllegalAccessException, InstantiationException { else if (msg instanceof Response) { Response response = (Response) msg; if (logger.isDebugEnabled()) { logger.debug("Received response message of " + response); } byte status = response.getStatus(); if (Response.OK == status) { Object data = response.getData(); RpcResult result = getRpcResult(response); if (result != null ) { result.fillData(data); } } } }
可以看到所做的事情就是从 Response 中拿到响应的数据,然后将其填充到 RpcResult 中,特别注意,这个 RpcResult 是和 rpc Request 的 id 对应的,所以填充的结果也是针对对应的 RpcResult,而在 RpcResult#fillData
方法中,就是完成了结果的填充,并唤醒了阻塞在当前 rpc 请求对应的线程,这样 rpc 请求的结果就被服务接口客户端代理返回给接口的方法调用者。
netty 通信服务端 在服务端的启动中,我们也只是说了创建了 NettyServer,但是具体的细节没有进行说明,现在就完成这一部分的解释。和客户端一样,我们需要关注的是 ChannelInitializer 对于 NioSocketChannel 的初始化,方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 .childHandler(new ChannelInitializer <NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel ch) throws Exception { ch.pipeline() .addLast("decoder" , new NettyDecoderHandler (getUrl())) .addLast("encoder" , new NettyEncoderHandler (getUrl())) .addLast("server-idle-handler" , new IdleStateHandler (0 , 0 , IDLE_TIMEOUT, MILLISECONDS)) .addLast("heartbeat-handler" , new HeartbeatHandler ()) .addLast("final-handler" , new ServerFinalChannelHandler (getUrl())); } });
上文已经说过了前四个 ChannelHandler 的作用及使用了,服务端对于他们的使用基本一致,所以也不再赘述,最后一个 ChannelHandler 和 NettyClient 的最后一个 ChannelHandler 的作用基本一致,都是向线程池中提交了一个 EventHandleThread,里边的方法是一样的,只是因为处理的是来自客户端的 Request 包,所以走的是 EventHandleThread#handleMsg
方法的 if (msg instanceof Request)
分支。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 if (msg instanceof Request) { Request request = (Request) msg; if (logger.isDebugEnabled()) { logger.debug("Received request message of " + request); } if (!request.isEvent()) { Response response = buildResponse(request); Object data = request.getData(); if (data instanceof RpcInfo) { Object result = doInvoke((RpcInfo) data); response.setData(result); ctx.writeAndFlush(response); } } } private Object doInvoke (RpcInfo data) throws InstantiationException, IllegalAccessException { RpcInfo info = data; Invoker invoker = BeehiveContext.unsafeGet(info.getServiceName(), Invoker.class); ServiceConfigBean serviceConfigBean = BeehiveContext.unsafeGet(UrlConstants.PROVIDER_MODEL, ServiceConfigBean.class); Object target = serviceConfigBean.getRef(); return invoker.invoke(info.createInvokeInfo(target)); }
在该分支中,首先是构建了一个 Response 实例,这个实例的 id 是和 Request 一致的,这样客户端收到该 Response 数据包后才知道对应哪个 RpcResult 实例。接下来,就是从收到的 Request 中拿到数据 RpcInfo,里边包含了 rpc 的服务名、方法名、参数类型签名、参数值。EventHandleThread#doInvoke
方法就是根据以上这些进行服务接口实现类方法的调用。可以看到它先从 BeehiveContext 中拿到了一个 Invoker 实例,其实它就是在服务端启动时创建的 Invoker 链。其最后一个节点就封装了服务接口的实现类。当我们进行 invoker 链的 invoke 方法调用时,最终就会触发服务接口的实现类对应方法的调用,返回的就是该方法的执行结果。拿到该结果后,就是将它填充到 Response 实例中,调用 tx.writeAndFlush(response);
将响应结果返回,这样一个完成的 rpc 过程就算完成了。
TODO-LIST
底层通信框架的支持有待完善,比如 Mina(我没接触过)
缺少一个服务的管理模块,框架当前提供了相应的 filter 接口,通过实现该接口即可对 rpc 请求进行拦截,由此来获取部分监控信息
SPI 拓展机制没有完成相应的组件筛选功能,比如 filter 接口实现类,框架没有提供基础的选择性获取方式
异步消息处理的线程池的构建应该更加灵活,相应的拒绝执行策略有待完善
负载均衡策略缺陷严重,应该需要根据实际的 provider 负载情况来动态的调整