纸上得来终觉浅,绝知此事要躬行。
看过HSF和Dubbo的分析文章,总觉得少点什么,于是决定自己动手撸一个简易版的分布式RPC框架。实际上在学校的时候就曾经尝试做过一个最简单的RPC,但是过于简单,没有使用到服务注册中心,通信使用的是原生socket,更不支持注解形式,也没融入Spring生态,所以这次在此基础上完善了很多内容。
代码很少,逻辑清晰,如下:
RPC框架的架构设计如下图所示:
提供服务发现、服务注册等功能,有很多中间件都可以实现,各有千秋,有支持AP的,也有支持CP的,本项目中使用的是zk。
易用性、异步、高性能,netty把socket编程编程了一件身心愉悦的事情,何乐而不为呢。
本框架参照HSF使用hession协议,也可以使用其他协议,但是本项目中没去实现。
使用了CGLib的FastClass库,目标是高性能。
对于注册中心而言,目标是能够提供服务注册、服务发现,其中,服务注册能够监听服务数据更新,服务发现能够提供负载均衡,首先定义注册中心的接口如下:
public interface ServiceRegistry {
/**
* 服务注册
* @param serviceMetadata 服务元数据
*/
void register(ServiceMetadata serviceMetadata) throws Exception;
/**
* 服务注销
* @param serviceMetadata 服务元数据
*/
void unRegister(ServiceMetadata serviceMetadata) throws Exception;
/**
* 服务发现
* @param serviceName 服务名
* @return
* @throws Exception
*/
ServiceMetadata discovery(String serviceName) throws Exception;
/**
* 关闭
* @throws Exception
*/
void close() throws Exception;
}
Copy
服务发现使用了策略模式,支持多种服务发现中间件,本框架中只用zookeeper进行了实现,其他中间件待开发。
因为curator的curator-x-discovery模块已经有服务发现的封装,提供了以下几个API,直接拿来主义:
• registerService: 注册服务
• unregisterService: 注销服务
• ServiceProvider.getInstance: 服务发现,支持轮询、随机、粘性策略三种负载均衡策略,同时支持自定义策略
这里我使用的是轮询策略,选择负载均衡策略的实现用到了策略模式;另外这个粘性策略我特意看了一下,解释是如果第一次返回的是某一个节点数据,那么后续对于同一个客户端的请求,都会返回相同的节点数据,转念一想这不就是一致性Hash策略嘛。
题外话:看到最近Dubbo放弃了zookeeper采用nacos作为注册中心,也有文章说zk的CP特性不适合做注册中心,更适合作为集群调度中心,这里不纠结最优解,所以使用zk进行实现。
TIPS:
• 关闭控制台大量的zookeeper debug log的方法:在logback.xml中加一行:
<logger name="org.apache.zookeeper" level="ERROR" />
Copy
对于本项目暂时未开始实现eureka的注册中心,但对其有过一定的了解,是springcloud的御用注册中心,和zk不同的是,eureka保证的是AP,舍弃了强一致性保障可用性,这一点比zk更适合作为RPC的注册中心。
服务发现做完后,生产者端的逻辑如下:
定义RPCProvider注解:
@Retention(RetentionPolicy.RUNTIME) //运行时
@Target(ElementType.TYPE) //注解class
@Component //被spring加载
public @interface RPCProvider {
Class<?> serviceInterface() default Object.class; //设置接口
String serviceVersion() default "1.0.0"; //版本
}
Copy
在代码中实现ApplicationContextAware接口,就可以通过ApplicationContext获取spring容器加载的所有bean,因此可以获取到使用了RPCProvider注解的bean。代码如下:
因为注释很全了,就不再多bb了。
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
//通过RPCProvider注解获取所有服务提供者
Map<String, Object> providerMap = applicationContext.getBeansWithAnnotation(RPCProvider.class);
if (MapUtils.isNotEmpty(providerMap)) {
for (Object providerBean : providerMap.values()) {
//获取注解上填写的服务接口和服务版本
RPCProvider rpcProvider = providerBean.getClass().getAnnotation(RPCProvider.class);
String serviceName = rpcProvider.serviceInterface().getName();
String version = rpcProvider.serviceVersion();
//构造服务唯一标识
String providerKey = ProviderUtils.makeKey(serviceName, version);
//缓存provider bean到本地缓存中
handlerMap.put(providerKey, providerBean);
// 注册服务到注册中心
String[] array = this.serverAddress.split(":");
String host = array[0];
int port = Integer.parseInt(array[1]);
//构造service metadata
ServiceMetadata metadata = ServiceMetadata.builder()
.address(host)
.serviceName(serviceName)
.servicePort(port)
.serviceVersion(version);
try {
serviceRegistry.register(metadata);
log.debug("register service: {}", metadata.toString());
} catch (Exception e) {
log.error("register service fail|{}|{}", metadata.toString(), e);
}
}
}
}
主要内容是实现一个netty server,端口号可以通过application.properties中的参数来定义。代码很简单:
/**
* 开启Netty服务监听,进行服务注册
*/
public void start() throws InterruptedException {
if (bossGroup == null || workerGroup == null) {
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
//通用平台使用NioServerSocketChannel,Linux使用EpollServerSocketChannel
bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new LengthFieldBasedFrameDecoder(65536, 0, 4, 0, 0))
.addLast(new RpcDecoder())
.addLast(new RpcEncoder())
.addLast(new RpcProviderHandler(handlerMap));
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
String[] array = serverAddress.split(":");
String host = array[0];
int port = Integer.parseInt(array[1]);
// 启动服务监听
ChannelFuture future = bootstrap.bind(host, port).sync();
log.info("Server started on port {}", port);
// 同步等待,会hang住线程,所以需要单独开一个线程来调用start方法
future.channel().closeFuture().sync();
}
}
其中使用了自定义长度解码器:LengthFieldBasedFrameDecoder。
另外一个重要之处是消息处理器的实现,RpcProviderHandler核心逻辑如下:
private Object handle(RpcRequest request) throws Throwable {
String providerKey = ProviderUtils.makeKey(request.getClassName(), request.getServiceVersion());
Object providerBean = handlerMap.get(providerKey);
if (null == providerBean) {
throw new RuntimeException(String.format("provider not exist: %s:%s", request.getClassName(),
request.getMethodName()));
}
Class<?> providerClass = providerBean.getClass();
String methodName = request.getMethodName();
Class<?>[] parameterTypes = request.getParameterTypes();
Object[] parameters = request.getParameters();
//使用CGlib创建一个服务生产者代理对象,调用消费者指定的方法
FastClass providerFastClass = FastClass.create(providerClass);
int methodIndex = providerFastClass.getIndex(methodName, parameterTypes);
return providerFastClass.invoke(methodIndex, providerBean, parameters);
}
关于springboot-starter的封装,之前写过一篇文档,参照之完成了对provider模块的封装,于是写了一个RpcProviderAutoConfiguration类,并在spring.factories里注册就OK了。
@Configuration
@EnableConfigurationProperties(RpcProperties.class)
public class RpcProviderAutoConfiguration {
@Resource
private RpcProperties rpcProperties;
@Bean
public RpcProvider init() throws Exception {
//通过配置决定使用哪种注册中心
ServiceRegistryType type = ServiceRegistryType.valueOf(rpcProperties.getServiceRegistryType());
//单例
ServiceRegistry serviceRegistry = ServiceRegistryFactory.getInstance(type, rpcProperties.getServiceRegistryAddress());
return new RpcProvider(rpcProperties.getServiceAddress(), serviceRegistry);
}
}
Copy
消费者端的实现本身并没有什么难度,思路是根据服务名+服务版本,从注册中心找到一个服务生产者的地址,通过Java Proxy API构造一个代理对象,在消费者进行服务调用时,代理对象发起一次网络请求获得远程生产者的结果。于是我就把消费者模块放到了最后来实现,但是写着写着发现不对劲,我的目标不是简单的完成一个消费者,而是要封装成一个易用的框架,必须使用注解来初始化消费者。平时在使用HSF的时候,通常用法是这样的:
@HSFConsumer
private BizService bizService;
public BizResponse handleBiz(String buyerId) {
return bizService.doBiz(buyerId);
}
Copy
使用者在调用服务时,只需要简简单单的引用服务的SDK,加上一个HSFConsumer注解,接下来就可以直接在Spring中使用这个bean。所以,全部的工作都交给了RPC框架,最主要的功能就是将服务生产者的接口类,实例化为一个代理的bean,并且注册到Spring容器中。整体流程如下:
首先定义RPCConsumer注解:
@Retention(RetentionPolicy.RUNTIME) //运行时解析
@Target(ElementType.FIELD) //使用目标为field,即一个属性
@Autowired //被spring容器加载
public @interface RPCConsumer {
/**
* @see com.fuxi.study.consumer.RpcConsumerBean#setServiceVersion(String)
* @return
*/
String serviceVersion() default "1.0.0";
/**
* @see com.fuxi.study.consumer.RpcConsumerBean#setRegistryType(String)
* @return
*/
String registryType() default "zookeeper";
/**
* @see com.fuxi.study.consumer.RpcConsumerBean#setRegistryAddress(String)
* @return
*/
String registryAddress() default "127.0.0.1:2181";
}
Copy
该注解使用方式和@HSFConsumer、@Resource、@Autowired一样,从Spring容器中取出一个bean实例。
说实话,一开始遇到这个问题的时候我是懵逼的,输入只有一个接口和一个RPCConsumer注解,如何把这个接口变成一个代理bean并且注入到Spring容器呢?于是机智的我打开了HSF的源码,发现HSFConsumer的注解处理器类实现了BeanFactoryPostProcessor接口,接着去看了这个接口的资料,先上接口定义:
/**
* Allows for custom modification of an application context's bean definitions,
* adapting the bean property values of the context's underlying bean factory.
*
* <p>Application contexts can auto-detect BeanFactoryPostProcessor beans in
* their bean definitions and apply them before any other beans get created.
*
* <p>Useful for custom config files targeted at system administrators that
* override bean properties configured in the application context.
*
* <p>See PropertyResourceConfigurer and its concrete implementations
* for out-of-the-box solutions that address such configuration needs.
*
* <p>A BeanFactoryPostProcessor may interact with and modify bean
* definitions, but never bean instances. Doing so may cause premature bean
* instantiation, violating the container and causing unintended side-effects.
* If bean instance interaction is required, consider implementing
* {@link BeanPostProcessor} instead.
**/
public interface BeanFactoryPostProcessor {
/**
* Modify the application context's internal bean factory after its standard
* initialization. All bean definitions will have been loaded, but no beans
* will have been instantiated yet. This allows for overriding or adding
* properties even to eager-initializing beans.
*/
void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException;
}
Copy
这一大串英文说的是啥意思呢,主要想表达以下几点:
• 1.可以在bean初始化(initialization)之前修改bean definition,达到hack bean的效果
• 2.不要随意实例化(instantiation)bean,不然会有意料之外的副作用
在SpringBoot启动方法里,可以看到会先加载所有的BeanFactoryPostProcessors,接着再加载BeanPostProcessors:
所以接下来,我们先通过postProcessBeanFactory找到所有被RPCConsumer修饰的bean,这些bean在代码中的存在形式是一个Field,即一个类中的属性,例如:
@RPCConsumer
private MyService myService;
Copy
拿到MyService这个属性后,再把这个属性的接口替换为一个代理bean,并重新注册到Spring的beanDefinitionMap中,这样就实现了bean的hack。hack的代码如下:
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
//遍历容器里的所有bean
for (String beanName : beanFactory.getBeanDefinitionNames()) {
BeanDefinition definition = beanFactory.getBeanDefinition(beanName);
String beanClassName = definition.getBeanClassName();
// 当用 @Bean 返回的类型是Object时,beanClassName是 null
if(beanClassName != null) {
//使用反射获取bean的class对象,注意classloader是容器加载bean的classloader
Class<?> clazz = ClassUtils.resolveClassName(definition.getBeanClassName(), this.classLoader);
ReflectionUtils.doWithFields(clazz, this::parseElement);
}
}
//重新注入到容器中
BeanDefinitionRegistry registry = (BeanDefinitionRegistry)beanFactory;
this.beanDefinitions.forEach((beanName, beanDefinition) -> {
if (context.containsBean(beanName)) {
throw new IllegalArgumentException("[RPC Starter] Spring context already has a bean named " + beanName
+ ", please change @RPCConsumer field name.");
}
registry.registerBeanDefinition(beanName, beanDefinitions.get(beanName));
log.info("registered RPCConsumerBean \"{}\" in spring context.", beanName);
});
}
/**
* 动态修改被RPCConsumer注解的bean,改为代理类
* @param field
*/
private void parseElement(Field field) {
RPCConsumer annotation = AnnotationUtils.getAnnotation(field, RPCConsumer.class);
if (annotation == null) {
return;
}
//构造新的factory bean的参数,hack成为自己的代理bean
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(RpcConsumerBean.class);
builder.setInitMethodName(Constants.INIT_METHOD);
builder.addPropertyValue("serviceVersion", annotation.serviceVersion());
builder.addPropertyValue("interfaceClass", field.getType());
builder.addPropertyValue("registryType", annotation.registryType());
builder.addPropertyValue("registryAddress", annotation.registryAddress());
BeanDefinition beanDefinition = builder.getBeanDefinition();
beanDefinitions.put(field.getName(), beanDefinition);
}
Copy
RPCConsumerBean是自定义一个代理消费者的Bean,实现了FactoryBean接口,RPCConsumerBean定义如下:
@Slf4j
public class RpcConsumerBean implements FactoryBean {
private Class<?> interfaceClass;
private String serviceVersion;
private String registryType;
private String registryAddress;
private Object object;
@Override
public Object getObject() throws Exception {
return this.object;
}
@Override
public Class<?> getObjectType() {
return interfaceClass;
}
@Override
public boolean isSingleton() {
return true;
}
public void init() throws Exception {
this.object = RpcConsumer.create(interfaceClass, serviceVersion, ServiceRegistryFactory.getInstance(
ServiceRegistryType.valueOf(registryType), registryAddress
));
log.info("RpcConsumerBean {} init ...", interfaceClass.getName());
}
/**
* setter注入
* @param interfaceClass
*/
public void setInterfaceClass(Class<?> interfaceClass) {
this.interfaceClass = interfaceClass;
}
public void setServiceVersion(String serviceVersion) {
this.serviceVersion = serviceVersion;
}
public void setRegistryType(String registryType) {
this.registryType = registryType;
}
public void setRegistryAddress(String registryAddress) {
this.registryAddress = registryAddress;
}
}
Copy
这样一套操作下来,SpringBoot在启动时,就会把RPCConsumer修饰的bean替换为自定义的RpcConsumerBean了。
使用了Proxy.newProxyInstance API来创建一个代理:
public static <T> T create(Class<T> interfaceClass, String serviceVersion, ServiceRegistry serviceRegistry) {
return (T)Proxy.newProxyInstance(
interfaceClass.getClassLoader(),
new Class<?>[] {interfaceClass},
new RpcInvokeHandler<>(serviceVersion, serviceRegistry));
}
Copy
动态代理实现类RpcInvokeHandler实现:
public class RpcInvokeHandler<T> implements InvocationHandler {
private static final String EQUALS = "equals";
private static final String HASH_CODE = "hashCode";
private static final String TO_STRING = "toString";
private String serviceVersion;
private ServiceRegistry serviceRegistry;
public RpcInvokeHandler() {
}
public RpcInvokeHandler(String serviceVersion, ServiceRegistry serviceRegistry) {
this.serviceVersion = serviceVersion;
this.serviceRegistry = serviceRegistry;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (Object.class == method.getDeclaringClass()) {
String name = method.getName();
if (EQUALS.equals(name)) {
return proxy == args[0];
} else if (HASH_CODE.equals(name)) {
return System.identityHashCode(proxy);
} else if (TO_STRING.equals(name)) {
return proxy.getClass().getName() + "@" +
Integer.toHexString(System.identityHashCode(proxy)) +
", with InvocationHandler " + this;
} else {
throw new IllegalStateException(String.valueOf(method));
}
}
//通过netty向service provider发起网络请求
RpcRequest request = new RpcRequest();
request.setRequestId(UUID.randomUUID().toString());
request.setClassName(method.getDeclaringClass().getName());
request.setServiceVersion(this.serviceVersion);
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setParameters(args);
RpcConsumer rpcConsumer = new RpcConsumer(this.serviceRegistry);
RpcResponse rpcResponse = rpcConsumer.sendRequest(request);
if (rpcResponse != null) {
log.debug("consumer receive provider rpc response: {}", rpcResponse.toString());
return rpcResponse.getResult();
} else {
throw new RuntimeException("consumer rpc fail, response is null");
}
}
}
Copy
RpcConsumerAutoConfiguration的实现:
@Configuration
@EnableConfigurationProperties(RpcProperties.class)
public class RpcConsumerAutoConfiguration {
@Bean
public static BeanFactoryPostProcessor rpcConsumerPostProcess() {
return new RpcConsumerPostProcessor();
}
}
Copy
再将其注册到spring.factories中。
做完生产者和消费者模块后,发现控制台模块实际上和RPC关系不大,而是一个通用的SDK管控平台,鉴于已经做过几个管控平台服务端了,于是没有花精力继续投入在这块了。
克隆代码到本地,使用maven打包到本地仓库:
mvn clean install -Dmaven.test.skip=true
Copy
引入maven依赖:
<dependency>
<groupId>com.fuxi.study</groupId>
<artifactId>rpc</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
Copy
定义接口:
public interface MyService {
String hello(String world);
}
Copy
实现接口,并注册为RPCProvider:
@RPCProvider(serviceVersion = "1.0.0.daily", serviceInterface = MyService.class)
public class MyServiceImpl implements MyService {
@Override
public String hello(String world) {
return String.format("server response at: %d for %s", System.currentTimeMillis(), world);
}
}
Copy
消费者代码:
@RPCConsumer(serviceVersion = "1.0.0.daily")
private MyService myService;
@GetMapping("/rpc")
public BaseResponse testRpc(@RequestParam String param) {
return BaseResponse.ok(
myService.hello(param)
);
}
Copy
配置文件:
# rpc
rpc.service-address=127.0.0.1:50001
rpc.service-registry-type=zookeeper
rpc.service-registry-address=127.0.0.1:2181
Copy
可以看到,整体使用效果和HSF是基本一致的,方便快捷易用~
运行起来的日志中生产者启动了端口:
调用一下测试HTTP接口:
完美~