从零开发一个RPC框架

从零开发一个RPC框架

纸上得来终觉浅,绝知此事要躬行。

看过HSF和Dubbo的分析文章,总觉得少点什么,于是决定自己动手撸一个简易版的分布式RPC框架。实际上在学校的时候就曾经尝试做过一个最简单的RPC,但是过于简单,没有使用到服务注册中心,通信使用的是原生socket,更不支持注解形式,也没融入Spring生态,所以这次在此基础上完善了很多内容。

代码很少,逻辑清晰,如下:


一、架构设计

RPC框架的架构设计如下图所示:


核心模块技术栈:


注册中心:zookeeper/eureka/etcd/consul...

提供服务发现、服务注册等功能,有很多中间件都可以实现,各有千秋,有支持AP的,也有支持CP的,本项目中使用的是zk。


网络调用:Netty

易用性、异步、高性能,netty把socket编程编程了一件身心愉悦的事情,何乐而不为呢。


序列化协议:Hession/Kyro/protobuf

本框架参照HSF使用hession协议,也可以使用其他协议,但是本项目中没去实现。


动态代理:CGLib

使用了CGLib的FastClass库,目标是高性能。


二、注册中心

对于注册中心而言,目标是能够提供服务注册、服务发现,其中,服务注册能够监听服务数据更新,服务发现能够提供负载均衡,首先定义注册中心的接口如下:

public interface ServiceRegistry {
    /**
     * 服务注册
     * @param serviceMetadata  服务元数据
     */
    void register(ServiceMetadata serviceMetadata) throws Exception;
    /**
     * 服务注销
     * @param serviceMetadata  服务元数据
     */
    void unRegister(ServiceMetadata serviceMetadata) throws Exception;
    /**
     * 服务发现
     * @param serviceName    服务名
     * @return
     * @throws Exception
     */
    ServiceMetadata discovery(String serviceName) throws Exception;
    /**
     * 关闭
     * @throws Exception
     */
    void close() throws Exception;
}

Copy

服务发现使用了策略模式,支持多种服务发现中间件,本框架中只用zookeeper进行了实现,其他中间件待开发。


zookeeper实现

因为curator的curator-x-discovery模块已经有服务发现的封装,提供了以下几个API,直接拿来主义:

• registerService: 注册服务

• unregisterService: 注销服务

• ServiceProvider.getInstance: 服务发现,支持轮询、随机、粘性策略三种负载均衡策略,同时支持自定义策略

这里我使用的是轮询策略,选择负载均衡策略的实现用到了策略模式;另外这个粘性策略我特意看了一下,解释是如果第一次返回的是某一个节点数据,那么后续对于同一个客户端的请求,都会返回相同的节点数据,转念一想这不就是一致性Hash策略嘛。

题外话:看到最近Dubbo放弃了zookeeper采用nacos作为注册中心,也有文章说zk的CP特性不适合做注册中心,更适合作为集群调度中心,这里不纠结最优解,所以使用zk进行实现。

TIPS:

• 关闭控制台大量的zookeeper debug log的方法:在logback.xml中加一行:

<logger name="org.apache.zookeeper" level="ERROR" />

Copy


eureka实现

对于本项目暂时未开始实现eureka的注册中心,但对其有过一定的了解,是springcloud的御用注册中心,和zk不同的是,eureka保证的是AP,舍弃了强一致性保障可用性,这一点比zk更适合作为RPC的注册中心。


三、provider实现

服务发现做完后,生产者端的逻辑如下:


3.1 RPCProvider注解

定义RPCProvider注解:

@Retention(RetentionPolicy.RUNTIME)  //运行时
@Target(ElementType.TYPE)            //注解class
@Component                           //被spring加载
public @interface RPCProvider {
    Class<?> serviceInterface() default Object.class;   //设置接口
    String serviceVersion() default "1.0.0";            //版本
}

Copy


3.2 扫描使用了RPCProvider注解的Bean并注册

在代码中实现ApplicationContextAware接口,就可以通过ApplicationContext获取spring容器加载的所有bean,因此可以获取到使用了RPCProvider注解的bean。代码如下:

因为注释很全了,就不再多bb了。

@Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        //通过RPCProvider注解获取所有服务提供者
        Map<String, Object> providerMap = applicationContext.getBeansWithAnnotation(RPCProvider.class);
        if (MapUtils.isNotEmpty(providerMap)) {
            for (Object providerBean : providerMap.values()) {
                //获取注解上填写的服务接口和服务版本
                RPCProvider rpcProvider = providerBean.getClass().getAnnotation(RPCProvider.class);
                String serviceName = rpcProvider.serviceInterface().getName();
                String version = rpcProvider.serviceVersion();
                //构造服务唯一标识
                String providerKey = ProviderUtils.makeKey(serviceName, version);
                //缓存provider bean到本地缓存中
                handlerMap.put(providerKey, providerBean);
                // 注册服务到注册中心
                String[] array = this.serverAddress.split(":");
                String host = array[0];
                int port = Integer.parseInt(array[1]);
                //构造service metadata
                ServiceMetadata metadata = ServiceMetadata.builder()
                    .address(host)
                    .serviceName(serviceName)
                    .servicePort(port)
                    .serviceVersion(version);
                try {
                    serviceRegistry.register(metadata);
                    log.debug("register service: {}", metadata.toString());
                } catch (Exception e) {
                    log.error("register service fail|{}|{}", metadata.toString(), e);
                }
            }
        }
    }


3.3 开启netty端口,实现消息处理器

主要内容是实现一个netty server,端口号可以通过application.properties中的参数来定义。代码很简单:

/**
     * 开启Netty服务监听,进行服务注册
     */
public void start() throws InterruptedException {
    if (bossGroup == null || workerGroup == null) {
        bossGroup = new NioEventLoopGroup();
        workerGroup = new NioEventLoopGroup();
        ServerBootstrap bootstrap = new ServerBootstrap();
        //通用平台使用NioServerSocketChannel,Linux使用EpollServerSocketChannel
        bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel channel) throws Exception {
                    channel.pipeline()
                        .addLast(new LengthFieldBasedFrameDecoder(65536, 0, 4, 0, 0))
                        .addLast(new RpcDecoder())
                        .addLast(new RpcEncoder())
                        .addLast(new RpcProviderHandler(handlerMap));
                }
            })
            .option(ChannelOption.SO_BACKLOG, 128)
            .childOption(ChannelOption.SO_KEEPALIVE, true);
        String[] array = serverAddress.split(":");
        String host = array[0];
        int port = Integer.parseInt(array[1]);
        // 启动服务监听
        ChannelFuture future = bootstrap.bind(host, port).sync();
        log.info("Server started on port {}", port);

        // 同步等待,会hang住线程,所以需要单独开一个线程来调用start方法
        future.channel().closeFuture().sync();
    }
}


其中使用了自定义长度解码器:LengthFieldBasedFrameDecoder。

另外一个重要之处是消息处理器的实现,RpcProviderHandler核心逻辑如下:

private Object handle(RpcRequest request) throws Throwable {
        String providerKey = ProviderUtils.makeKey(request.getClassName(), request.getServiceVersion());
        Object providerBean = handlerMap.get(providerKey);
        if (null == providerBean) {
            throw new RuntimeException(String.format("provider not exist: %s:%s", request.getClassName(),
                request.getMethodName()));
        }
        Class<?> providerClass = providerBean.getClass();
        String methodName = request.getMethodName();
        Class<?>[] parameterTypes = request.getParameterTypes();
        Object[] parameters = request.getParameters();
        //使用CGlib创建一个服务生产者代理对象,调用消费者指定的方法
        FastClass providerFastClass = FastClass.create(providerClass);
        int methodIndex = providerFastClass.getIndex(methodName, parameterTypes);
        return providerFastClass.invoke(methodIndex, providerBean, parameters);
    }


3.4 starter封装

关于springboot-starter的封装,之前写过一篇文档,参照之完成了对provider模块的封装,于是写了一个RpcProviderAutoConfiguration类,并在spring.factories里注册就OK了。

@Configuration
@EnableConfigurationProperties(RpcProperties.class)
public class RpcProviderAutoConfiguration {
    @Resource
    private RpcProperties rpcProperties;
    @Bean
    public RpcProvider init() throws Exception {
        //通过配置决定使用哪种注册中心
        ServiceRegistryType type = ServiceRegistryType.valueOf(rpcProperties.getServiceRegistryType());
        //单例
        ServiceRegistry serviceRegistry = ServiceRegistryFactory.getInstance(type, rpcProperties.getServiceRegistryAddress());
        return new RpcProvider(rpcProperties.getServiceAddress(), serviceRegistry);
    }
}

Copy


四、consumer实现

消费者端的实现本身并没有什么难度,思路是根据服务名+服务版本,从注册中心找到一个服务生产者的地址,通过Java Proxy API构造一个代理对象,在消费者进行服务调用时,代理对象发起一次网络请求获得远程生产者的结果。于是我就把消费者模块放到了最后来实现,但是写着写着发现不对劲,我的目标不是简单的完成一个消费者,而是要封装成一个易用的框架,必须使用注解来初始化消费者。平时在使用HSF的时候,通常用法是这样的:

@HSFConsumer
private BizService bizService;

public BizResponse handleBiz(String buyerId) {
    return bizService.doBiz(buyerId);
}

Copy

使用者在调用服务时,只需要简简单单的引用服务的SDK,加上一个HSFConsumer注解,接下来就可以直接在Spring中使用这个bean。所以,全部的工作都交给了RPC框架,最主要的功能就是将服务生产者的接口类,实例化为一个代理的bean,并且注册到Spring容器中。整体流程如下:


4.1 RPCConsumer注解定义

首先定义RPCConsumer注解:

@Retention(RetentionPolicy.RUNTIME) //运行时解析
@Target(ElementType.FIELD)          //使用目标为field,即一个属性
@Autowired                          //被spring容器加载
public @interface RPCConsumer {
    /**
     * @see com.fuxi.study.consumer.RpcConsumerBean#setServiceVersion(String)
     * @return
     */
    String serviceVersion() default "1.0.0";
    /**
     * @see com.fuxi.study.consumer.RpcConsumerBean#setRegistryType(String)
     * @return
     */
    String registryType() default "zookeeper";
    /**
     * @see com.fuxi.study.consumer.RpcConsumerBean#setRegistryAddress(String)
     * @return
     */
    String registryAddress() default "127.0.0.1:2181";
}

Copy

该注解使用方式和@HSFConsumer、@Resource、@Autowired一样,从Spring容器中取出一个bean实例。


4.2 实现bean的注入

说实话,一开始遇到这个问题的时候我是懵逼的,输入只有一个接口和一个RPCConsumer注解,如何把这个接口变成一个代理bean并且注入到Spring容器呢?于是机智的我打开了HSF的源码,发现HSFConsumer的注解处理器类实现了BeanFactoryPostProcessor接口,接着去看了这个接口的资料,先上接口定义:

/**
 * Allows for custom modification of an application context's bean definitions,
 * adapting the bean property values of the context's underlying bean factory.
 *
 * <p>Application contexts can auto-detect BeanFactoryPostProcessor beans in
 * their bean definitions and apply them before any other beans get created.
 *
 * <p>Useful for custom config files targeted at system administrators that
 * override bean properties configured in the application context.
 *
 * <p>See PropertyResourceConfigurer and its concrete implementations
 * for out-of-the-box solutions that address such configuration needs.
 *
 * <p>A BeanFactoryPostProcessor may interact with and modify bean
 * definitions, but never bean instances. Doing so may cause premature bean
 * instantiation, violating the container and causing unintended side-effects.
 * If bean instance interaction is required, consider implementing
 * {@link BeanPostProcessor} instead.
 **/
public interface BeanFactoryPostProcessor {
    /**
     * Modify the application context's internal bean factory after its standard
     * initialization. All bean definitions will have been loaded, but no beans
     * will have been instantiated yet. This allows for overriding or adding
     * properties even to eager-initializing beans.
     */
    void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException;
}

Copy

这一大串英文说的是啥意思呢,主要想表达以下几点:

• 1.可以在bean初始化(initialization)之前修改bean definition,达到hack bean的效果

• 2.不要随意实例化(instantiation)bean,不然会有意料之外的副作用

在SpringBoot启动方法里,可以看到会先加载所有的BeanFactoryPostProcessors,接着再加载BeanPostProcessors:

所以接下来,我们先通过postProcessBeanFactory找到所有被RPCConsumer修饰的bean,这些bean在代码中的存在形式是一个Field,即一个类中的属性,例如:

@RPCConsumer
private MyService myService;

Copy

拿到MyService这个属性后,再把这个属性的接口替换为一个代理bean,并重新注册到Spring的beanDefinitionMap中,这样就实现了bean的hack。hack的代码如下:

@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
    this.beanFactory = beanFactory;
    //遍历容器里的所有bean
    for (String beanName : beanFactory.getBeanDefinitionNames()) {
        BeanDefinition definition = beanFactory.getBeanDefinition(beanName);
        String beanClassName = definition.getBeanClassName();
        // 当用 @Bean 返回的类型是Object时,beanClassName是 null
        if(beanClassName != null) {
            //使用反射获取bean的class对象,注意classloader是容器加载bean的classloader
            Class<?> clazz = ClassUtils.resolveClassName(definition.getBeanClassName(), this.classLoader);
            ReflectionUtils.doWithFields(clazz, this::parseElement);
        }
    }
    //重新注入到容器中
    BeanDefinitionRegistry registry = (BeanDefinitionRegistry)beanFactory;
    this.beanDefinitions.forEach((beanName, beanDefinition) -> {
        if (context.containsBean(beanName)) {
            throw new IllegalArgumentException("[RPC Starter] Spring context already has a bean named " + beanName
                                               + ", please change @RPCConsumer field name.");
        }
        registry.registerBeanDefinition(beanName, beanDefinitions.get(beanName));
        log.info("registered RPCConsumerBean \"{}\" in spring context.", beanName);
    });
}
/**
     * 动态修改被RPCConsumer注解的bean,改为代理类
     * @param field
     */
private void parseElement(Field field) {
    RPCConsumer annotation = AnnotationUtils.getAnnotation(field, RPCConsumer.class);
    if (annotation == null) {
        return;
    }
    //构造新的factory bean的参数,hack成为自己的代理bean
    BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(RpcConsumerBean.class);
    builder.setInitMethodName(Constants.INIT_METHOD);
    builder.addPropertyValue("serviceVersion", annotation.serviceVersion());
    builder.addPropertyValue("interfaceClass", field.getType());
    builder.addPropertyValue("registryType", annotation.registryType());
    builder.addPropertyValue("registryAddress", annotation.registryAddress());
    BeanDefinition beanDefinition = builder.getBeanDefinition();
    beanDefinitions.put(field.getName(), beanDefinition);
}

Copy

RPCConsumerBean是自定义一个代理消费者的Bean,实现了FactoryBean接口,RPCConsumerBean定义如下:

@Slf4j
public class RpcConsumerBean implements FactoryBean {
    private Class<?> interfaceClass;
    private String serviceVersion;
    private String registryType;
    private String registryAddress;
    private Object object;
    @Override
    public Object getObject() throws Exception {
        return this.object;
    }
    @Override
    public Class<?> getObjectType() {
        return interfaceClass;
    }
    @Override
    public boolean isSingleton() {
        return true;
    }
    public void init() throws Exception {
        this.object = RpcConsumer.create(interfaceClass, serviceVersion, ServiceRegistryFactory.getInstance(
            ServiceRegistryType.valueOf(registryType), registryAddress
        ));
        log.info("RpcConsumerBean {} init ...", interfaceClass.getName());
    }
    /**
     * setter注入
     * @param interfaceClass
     */
    public void setInterfaceClass(Class<?> interfaceClass) {
        this.interfaceClass = interfaceClass;
    }
    public void setServiceVersion(String serviceVersion) {
        this.serviceVersion = serviceVersion;
    }
    public void setRegistryType(String registryType) {
        this.registryType = registryType;
    }
    public void setRegistryAddress(String registryAddress) {
        this.registryAddress = registryAddress;
    }
}

Copy

这样一套操作下来,SpringBoot在启动时,就会把RPCConsumer修饰的bean替换为自定义的RpcConsumerBean了。


4.3 动态代理的实现

使用了Proxy.newProxyInstance API来创建一个代理:

public static <T> T create(Class<T> interfaceClass, String serviceVersion, ServiceRegistry serviceRegistry) {
    return (T)Proxy.newProxyInstance(
        interfaceClass.getClassLoader(),
        new Class<?>[] {interfaceClass},
        new RpcInvokeHandler<>(serviceVersion, serviceRegistry));
}

Copy

动态代理实现类RpcInvokeHandler实现:

public class RpcInvokeHandler<T> implements InvocationHandler {
    private static final String EQUALS = "equals";
    private static final String HASH_CODE = "hashCode";
    private static final String TO_STRING = "toString";
    private String serviceVersion;
    private ServiceRegistry serviceRegistry;
    public RpcInvokeHandler() {
    }
    public RpcInvokeHandler(String serviceVersion, ServiceRegistry serviceRegistry) {
        this.serviceVersion = serviceVersion;
        this.serviceRegistry = serviceRegistry;
    }
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        if (Object.class == method.getDeclaringClass()) {
            String name = method.getName();
            if (EQUALS.equals(name)) {
                return proxy == args[0];
            } else if (HASH_CODE.equals(name)) {
                return System.identityHashCode(proxy);
            } else if (TO_STRING.equals(name)) {
                return proxy.getClass().getName() + "@" +
                    Integer.toHexString(System.identityHashCode(proxy)) +
                    ", with InvocationHandler " + this;
            } else {
                throw new IllegalStateException(String.valueOf(method));
            }
        }
        //通过netty向service provider发起网络请求
        RpcRequest request = new RpcRequest();
        request.setRequestId(UUID.randomUUID().toString());
        request.setClassName(method.getDeclaringClass().getName());
        request.setServiceVersion(this.serviceVersion);
        request.setMethodName(method.getName());
        request.setParameterTypes(method.getParameterTypes());
        request.setParameters(args);
        RpcConsumer rpcConsumer = new RpcConsumer(this.serviceRegistry);
        RpcResponse rpcResponse = rpcConsumer.sendRequest(request);
        if (rpcResponse != null) {
            log.debug("consumer receive provider rpc response: {}", rpcResponse.toString());
            return rpcResponse.getResult();
        } else {
            throw new RuntimeException("consumer rpc fail, response is null");
        }
    }
}

Copy


4.4 starter封装

RpcConsumerAutoConfiguration的实现:

@Configuration
@EnableConfigurationProperties(RpcProperties.class)
public class RpcConsumerAutoConfiguration {
    @Bean
    public static BeanFactoryPostProcessor rpcConsumerPostProcess() {
        return new RpcConsumerPostProcessor();
    }
}

Copy

再将其注册到spring.factories中。


五、控制台

做完生产者和消费者模块后,发现控制台模块实际上和RPC关系不大,而是一个通用的SDK管控平台,鉴于已经做过几个管控平台服务端了,于是没有花精力继续投入在这块了。


六、测试


6.1 打包

克隆代码到本地,使用maven打包到本地仓库:

mvn clean install -Dmaven.test.skip=true

Copy


6.2 在SpringBoot项目中应用

引入maven依赖:

<dependency>
    <groupId>com.fuxi.study</groupId>
    <artifactId>rpc</artifactId>
    <version>1.0-SNAPSHOT</version>
</dependency>

Copy

定义接口:

public interface MyService {
    String hello(String world);
}

Copy

实现接口,并注册为RPCProvider:

@RPCProvider(serviceVersion = "1.0.0.daily", serviceInterface = MyService.class)
public class MyServiceImpl implements MyService {
    @Override
    public String hello(String world) {
        return String.format("server response at: %d for %s", System.currentTimeMillis(), world);
    }
}

Copy

消费者代码:

@RPCConsumer(serviceVersion = "1.0.0.daily")
private MyService myService;
@GetMapping("/rpc")
public BaseResponse testRpc(@RequestParam String param) {
    return BaseResponse.ok(
        myService.hello(param)
    );
}

Copy

配置文件:

# rpc
rpc.service-address=127.0.0.1:50001
rpc.service-registry-type=zookeeper
rpc.service-registry-address=127.0.0.1:2181

Copy

可以看到,整体使用效果和HSF是基本一致的,方便快捷易用~

运行起来的日志中生产者启动了端口:

调用一下测试HTTP接口:

完美~

评论区
Rick ©2018