找回密码
 立即注册
查看: 296|回复: 0

干货!八千字让你彻底学会写一个中高级程序员必会的分布式RPC框架

[复制链接]
发表于 2022-7-5 13:17 | 显示全部楼层 |阅读模式
一.概述
什么是RPC?
    远程服务调用官方:一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的思想通俗一点:客户端在不知道调用细节的情况下,调用存在于远程计算机上的某个对象,就像调用本地应用程序中的对象一样。市面上常见的rpc框架:dobbo,springCloud,gRPC...
<hr>
那为什么要有 RPC,HTTP 不好么?
    因为 RPC 和 HTTP 就不是一个层级的东西,所以严格意义上这两个没有可比性,也不应该来作比较。HTTP 只是传输协议,协议只是规范了一定的交流格式RPC 对比的是本地过程调用,是用来作为分布式系统之间的通信,它可以用 HTTP 来传输,也可以基于 TCP 自定义协议传输。HTTP 协议比较冗余,所以 RPC 大多都是基于 TCP 自定义协议,定制化的才是最适合自己的。
<hr>
项目总体结构

<hr>
整体架构

接下来,分别解释上述的过程
二.自定义注解
服务的提供者和消费者公用一个接口,@ServiceExpose是为了暴露服务,放在生产者的某个实现类上;@ServiceReference是为了引用服务,放在消费者的需要注入的属性上。

    Target:指定被修饰的Annotation可以放置的位置(被修饰的目标)

    @Target(ElementType.TYPE) //接口、类

    @Target(ElementType.FIELD) //属性

    @Target(ElementType.METHOD) //方法

    @Target(ElementType.PARAMETER) //方法参数

    @Target(ElementType.CONSTRUCTOR) //构造函数

    @Target(ElementType.LOCAL_VARIABLE) //局部变量

    @Target(ElementType.ANNOTATION_TYPE) //注解

    @Target(ElementType.PACKAGE) //包

    Retention:定义注解的保留策略

    @Retention(RetentionPolicy.SOURCE) //注解仅存在于源码中,在class字节码文件中不包含

    @Retention(RetentionPolicy.CLASS) //默认的保留策略,注解会在class字节码文件中存在,但运行时无法获得

    @Retention(RetentionPolicy.RUNTIME) //注解会在class字节码文件中存在,在运行时可以通过反射获取到

    Documented:指定被修饰的该Annotation可以被javadoc工具提取成文档

    Inherited:指定被修饰的Annotation将具有继承性
二.启动配置
主要是加载一些rpc相关的配置类,使用SpringBoot自动装配。可以使用SPI机制加入一些自定义的类,放到指定文件夹中。
三.rpc接口注入/rpc服务扫描
这里主要就是通过反射获得对应注解的属性/类,进行服务暴露/服务引用。 这里需要关注的是什么时候进行服务暴露/引用?如下:
    客户端:一般有俩种方案饿汉式:饿汉式是通过实现 Spring 的InitializingBean接口中的 afterPropertiesSet方法,容器通过调用 ReferenceBeanafterPropertiesSet方法时引入服务。(在Spring启动时,给所有的属性注入实现类,包含远程和本地的实现类)懒汉式:只有当这个服务被注入到其他类中时启动引入流程,也就是说用到了才会开始服务引入。在应用的Spring IOC 容器刷新完毕(spring Context初始化)之后,扫描所有的Bean,将Bean中带有@ServiceExpose/@ServiceReference注解的field获取到,然后创建field类型的代理对象,创建完成后,将代理对象set给此field。后续就通过该代理对象创建服务端连接,并发起调用。(dubbo默认)服务端:与懒汉式一样。

那么怎么知道Spring IOC刷新完成,这里就使用一个Spring提供的监听器,当Spring IOC刷新完成,就会触发监听器。
四.服务注册到ZK/从Zk获得服务
Zookeeper采用节点树的数据模型,类似linux文件系统,/,/node1,/node2 比较简单。不懂Zookeeper请移步:Zookeeper原理

我们采用的是对每个服务名创建一个持久节点,服务注册时实际上就是在zookeeper中该持久节点下创建了一个临时节点,该临时节点存储了服务的IP、端口、序列化方式等。

客户端获取服务时通过获取持久节点下的临时节点列表,解析服务地址数据:

客户端监听服务变化:
五.生成代理类对象
这里使用JDK的动态代理,也可以使用cglib或者Javassist(dobbo使用)。
public class ClientProxyFactory {    /**     * 获取代理对象,绑定 invoke 行为     *     * @param clazz 接口 class 对象     * @param <T>   类型     * @return 代理对象     */public <T> T getProxyInstance(Class<T> clazz) {        return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, new InvocationHandler() {            final Random random = new Random();            @Override            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {                // 第一步:通过服务发现机制选择一个服务提供者暴露的服务                String serviceName = clazz.getName();                final List<ServiceInfo> serviceInfos = serviceDiscovery.listServices(serviceName);                logger.info("Rpc server instance list: {}", serviceInfos);                if (CollectionUtils.isEmpty(serviceInfos)) {                    throw new RpcException("No rpc servers found.");                }                // TODO: 这里模拟负载均衡,从多个服务提供者暴露的服务中随机挑选一个,后期写方法实现负载均衡                final ServiceInfo serviceInfo = serviceInfos.get(random.nextInt(serviceInfos.size()));                // 第二步:构造 rpc 请求对象                final RpcRequest rpcRequest = new RpcRequest();                rpcRequest.setServiceName(serviceName);                rpcRequest.setMethod(method.getName());                rpcRequest.setParameterTypes(method.getParameterTypes());                rpcRequest.setParameters(args);                // 第三步:编码请求消息, TODO: 这里可以配置多种编码方式                byte[] data = messageProtocol.marshallingReqMessage(rpcRequest);                // 第四步:调用 rpc client 开始发送消息                byte[] byteResponse = rpcClient.sendMessage(data, serviceInfo);                // 第五步:解码响应消息                final RpcResponse rpcResponse = messageProtocol.unmarshallingRespMessage(byteResponse);                // 第六步:解析返回结果进行处理                if (rpcResponse.getException() != null) {                    throw rpcResponse.getException();                }                return rpcResponse.getRetValue();            }        });    }}
六.负载均衡
本实现支持两种主要负载均衡策略,随机和轮询,其中他们都支持带权重的随机和轮询,其实也就是四种策略。
七.Netty通信
服务端和客户端基本一样,这里只展示服务端的代码。代理对象在Spring启动的时候就生成了,但是没有调用,每一个调用(请求)都会生成一个Netty的连接。
public class NettyRpcServer extends RpcServer {    @Override    public void start() {        // 创建两个线程组        EventLoopGroup bossGroup = new NioEventLoopGroup();        EventLoopGroup workerGroup = new NioEventLoopGroup();        try {            // 创建服务端的启动对象            ServerBootstrap serverBootstrap = new ServerBootstrap()                    // 设置两个线程组                    .group(bossGroup, workerGroup)                    // 设置服务端通道实现类型                    .channel(NioServerSocketChannel.class)                    // 服务端用于接收进来的连接,也就是boosGroup线程, 线程队列大小                    .option(ChannelOption.SO_BACKLOG, 100)                    .childOption(ChannelOption.SO_KEEPALIVE, true)                    // child 通道,worker 线程处理器                    .childHandler(new ChannelInitializer<SocketChannel>() {                        // 给 pipeline 管道设置自定义的处理器                        @Override                        public void initChannel(SocketChannel channel) {                            ChannelPipeline pipeline = channel.pipeline();                            pipeline.addLast(new NettyServerHandler());                        }                    });            // 绑定端口号,同步启动服务            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();            channel = channelFuture.channel();            // 对关闭通道进行监听,变为同步            channelFuture.channel().closeFuture().sync();        } catch (Exception e) {            logger.error("server error.", e);        } finally {            // 释放线程组资源            bossGroup.shutdownGracefully();            workerGroup.shutdownGracefully();        }    }
实现具体handler
public class NettyServerHandler extends ChannelInboundHandlerAdapter {    //当通道就绪就会触发该方法    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        //进行记录        logger.info("channel active: {}", ctx);    }    //读取数据实际(这里我们可以读取客户端发送的消息)    @Override    public void channelRead(ChannelHandlerContext ctx, MyDataInfo.MyMessage msg) throws Exception {        //将数据读到buffer中        final ByteBuf msgBuf = (ByteBuf) msg;        final byte[] reqBytes = new byte[msgBuf.readableBytes()];        msgBuf.readBytes(reqBytes);    }    //数据读取完毕    @Override    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {        //使用反射获找到目标方法进行返回        final byte[] respBytes = requestHandler.handleRequest(reqBytes);        ctx.writeAndFlush(respBytes);    }    //处理异常, 一般是需要关闭通道    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        ctx.close();    }}
八.序列化协议
对计算机网络稍微有一点了解的同学都知道,数据在网络中传输是二进制的:01010101010101010,类似这种,只有二进制数据才能在网络中传输。但是在编码之前我们一般先进行序列化,目的是为了优化传输的数据量。因为有的数据太大,需要进行空间优化。

那么我们来区分一下序列化和编码:我画一张图大家都全明白了

<hr>
定义一个序列化协议,放入作为一个handler放入pipeline中。

Netty支持多种序列化,比如jdk,Json,ProtoBuf 等,这里使用ProtoBuf,其序列化后码流小性能高,非常适合RPC调用。接下来看怎么使用ProtoBuf?
    1.编写需要序列化的类xxx.proto:ProtoBuf有自己的语法规则(自行百度)

    2.通过官网提供的protoc.exe生成对应的Java代码3.前面通过工具生成的代码(AnimalProto)已经帮我们封装好了序列化和反序列化的方法,我们只需要调用对应方法即可

引入Protobuf的依赖
<dependency>    <groupId>com.google.protobuf</groupId>    <artifactId>protobuf-java</artifactId>    <version>2.4.1</version></dependency>
序列化:
/** * 调用对象构造好的Builder,完成属性赋值和序列化操作 * @return */public static byte[] protobufSerializer(){    AnimalProto.Animal.Builder builder = AnimalProto.Animal.newBuilder();    builder.setId(1L);    builder.setName("小猪");    List<String> actions = new ArrayList<>();    actions.add("eat");    actions.add("run");    builder.addAllActions(actions);    return builder.build().toByteArray();}
反序列化:
/** * 通过调用parseFrom则完成反序列化 * @param bytes * @return * @throws InvalidProtocolBufferException */public static Animal deserialize(byte[] bytes) throws Exception {    AnimalProto.Animal pAnimal = AnimalProto.Animal.parseFrom(bytes);    Animal animal = new Animal();    animal.setId(pAnimal.getId());    animal.setName(pAnimal.getName());    animal.setActions(pAnimal.getActionsList());    return animal;}
测试:
public static void main(String[] args) throws Exception {    byte[] bytes = serializer();    Animal animal = deserialize(bytes);    System.out.println(animal);}
以下看到是能正常序列化和反序列化的:
九.通信协议
通信协议主要是解决网络传输问题,比如TCP拆包粘包问题。

TCP问题:
    TCP拆包粘包主要就是把一些数据合并或者分割开进行发送,这时候有的数据就不完整,有的数据就多出一部分,就会造成问题。一般使用TCP协议都需要考虑拆包粘包问题tcp粘包和半包问题就是因为滑动窗口。 因为不管你的数据是多少长度,怎么分割每一条数据。但是tcp只按照我滑动窗口的长度发送。本质是因为TCP是流式协议,消息无边界。

解决方案:业界的主流协议的解决方案可以归纳如下
    消息定长:例如每个报文的大小为固定长度100字节,如果不够用空格补足。(定长解码器)在包尾加特殊结束符进行分割。(分隔符编码器)

    消息长度+消息:将消息分为消息头和消息体,消息头中包含表示消息总长度(或者消息体长度)的字段。Netty自带:

    自定义编解码器

这里只是列举出来编码过程,解码是逆过程。(说白了,编码就是找着固定的格式进行写入,解码就是照着固定的格式读)

<hr>
恭喜你,已经学会写RPC框架了,想深入了解的朋友可以参照源码。进行学习,升级。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

×
懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

小黑屋|手机版|Unity开发者联盟 ( 粤ICP备20003399号 )

GMT+8, 2024-11-25 22:59 , Processed in 0.069147 second(s), 23 queries .

Powered by Discuz! X3.5 Licensed

© 2001-2024 Discuz! Team.

快速回复 返回顶部 返回列表