|
什么是 Netty
Netty 是一个高性能的网络通信框架,封装了底层复杂的 socket 编程细节,让我们可以高效快速构建自己的应用
有哪些开源框架用了 Netty 呢?grpc、dubbo、kafka、rocketmq、zookeeper、hadoop
Netty Demo
server 端启动 netty 服务器
public static void main(String[] args) throws InterruptedException { EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(boss, worker) .channel(NioServerSocketChannel.class) // 设置接收缓冲区大小 // 控制窗口值 .childOption(ChannelOption.SO_RCVBUF, 32 * 1024) // 设置发送缓冲区大小 .childOption(ChannelOption.SO_SNDBUF, 32 * 1023) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline channelPipeline = ch.pipeline(); channelPipeline.addLast(new JdrpcCodec()); channelPipeline.addLast(new JdrpcServerHandler()); } }); ChannelFuture channelFuture = serverBootstrap.bind(JdrpcConstant.PORT).sync(); channelFuture.addListener(future -> { if (future.isSuccess()) { logger.info("服务启动成功,绑定端口: {}", JdrpcConstant.PORT); } else { logger.error("服务启动失败"); } }); channelFuture.channel().closeFuture().sync(); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); }}
启动 client
public static void main(String[] args) throws InterruptedException { EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class) // 连接超时时间 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) // 保持长连接心跳 .option(ChannelOption.SO_KEEPALIVE, true) // 禁用 Nagle 算法 .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() // 设置编码解码器 .addLast(new JdrpcCodec()) // 业务逻辑处理类 .addLast(new JdrpcClientHandler()); } }); bootstrap.connect("127.0.0.1", JdrpcConstant.PORT).addListener(future -> { if (future.isSuccess()) { logger.info("连接成功, 启动控制台"); waitInputMessage(((ChannelFuture) future).channel()); } else { logger.error("连接失败"); } });}通信协议
通信协议用来指定调用方和被调用方通信的规则,满足通信协议字节数组即为一个完整的数据包
我们来看看阿里的 dubbo 协议(rpc 协议)
某某公司的 rpc 协议,先要读取到一个 \n 换行符然后得到数据包长度 length 字段,然后读取 length 长度的内容
<=4 字节 | 1字节 | 不确定 | length | \n | content | 序列化协议
序列化协议用来指定通信协议中的内容部分该如何序列化传输,所有协议的设计无非是下面几个点
(1)可读性
(2)编解码压缩解压缩效率
(3)压缩后包的大小
json 序列化:将需要传输的对象序列化为 json 字符串,同时获取字符串的字节数组填充内容,jdrpc 采用的就是 json 序列化
hessian 序列化、protobuf 序列化:采用一些高效的序列化算法让内容数据包足够小,压缩效率足够快,最后直接序列化为字节数组
各种序列化协议的性能比较,序列化后数据包大小比较
缓冲区 ByteBuf
在读写 socket 数据的时候是基于缓冲区进行数据读写的,缓冲区分为 2 大类堆缓冲区、堆外缓冲区
当网卡收到数据后会请求中断将内核将数据通过 DMA 拷贝到 tcp 缓冲区中,如果是边缘触发机制的话收到数据就会通知 read 事件准备就绪,此时进程可以开始读取数据
堆缓冲区:数据是存储在 Java 堆中的,通过 CPU 拷贝内核 tcp 缓冲区数据到用户态,然后 CPU 拷贝用户态数据到 Java 堆中
堆外缓冲区(也叫直接缓冲区):数据是存储在非堆的,通过 CPU 拷贝内核 tcp 缓冲区数据到用户态,相比堆缓冲区少了一次数据拷贝
需要注意的是当我们使用的是堆缓冲区,那么发送数据的时候需要将堆缓冲区先拷贝到直接缓冲区,然后基于直接缓冲区进行数据的发送,因为底层网络传输的是需要传递不可变的引用地址,而 Java 堆是会随着 gc 而改变位置的
复合缓冲区:聚合了多个 ByteBuf
什么时候用堆缓冲区?什么时候用堆外缓冲区?
netty 中的缓冲区ByteBuf 设计
该图来自于, 跟着闪电侠学netty
编码
按照通信协议的规范来组织数据,将需要传输的对象编码为字节数组进行传输,如以下的编码方式为 jdrpc 协议通信方式
(1)写入数据包长度
(2)写入分隔符
(3)采用 JSON 序列化内容,将其转换为 JSON 的字节数组jdrpc 协议这里的设计如果数据包长度是固定 4 个字节那么就没有必要写入分隔符了,当然如果长度不是固定的如 1-4 字节那么确实有分隔符存在的必要 protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { JSONObject object = (JSONObject) msg; String content = object.toJSONString(); // 获取内容的字节数组 byte[] bytes = content.getBytes(StandardCharsets.UTF_8); // 写入数据包长度 // int 为 4 字节 out.writeInt(bytes.length); // 写入分隔符 out.writeByte('\n'); // 写入字节数组 out.writeBytes(bytes);}解码
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { // 寻找 \n 分隔符 int i = in.forEachByte(ByteProcessor.FIND_LF); if (i < 0) { return; } // 找到了分隔符 // 读取数据包长度 // 记录当前读取到的 index 标记 in.markReaderIndex(); byte[] lengthBytes = new byte; in.readBytes(lengthBytes); // 获取长度,读数据数据包 int length = Util.byteToInt(lengthBytes); logger.info("获取到的长度: {}", length); // tcp 缓冲区的数据是否满足一个数据包长度的要求 // 满足就进行粘包,不满足则继续等待数据包 if (in.readableBytes() < length) { // 还原读取的位置 in.resetReaderIndex(); return; } // 跳过 \n in.skipBytes(1); // 读取数据 byte[] content = new byte[length]; in.readBytes(content); String res = new String(content, StandardCharsets.UTF_8); out.add(res); // 释放缓冲区已经处理的字节避免内存泄露 ReferenceCountUtil.release(msg);}
当被调用方收到请求时候,按照通信协议的规范进行解码出来请求内容,比如 jdrpc 协议
(1)遍历 tcp 缓冲区寻找 \n 换行符,记录其偏移量
(2)读取偏移量前的字节数组将其转换为 int 得到内容的长度 length
(3)跳过 \n 换行符字节
(4)读取 length 个字节数据,将其转换为 String 即为 JSON 字符串
当我们在读取缓冲区数据进行解码的时候会出现以下几种情况
1. 遍历 tcp 缓冲区数据结果没有找到分隔符 \n继续向 selector 注册 OP_READ 事件,当后续数据到达 tcp 缓冲区时候基于边缘触发机制,select 感知到读事件返回,继续处理 tcp 缓冲区数据直到找到了 \n
2. 找到了 \n 解析出来内容长度为 5,结果检索 tcp 缓冲区中数据只有 3字节 不满足一个数据包要求3. 找到了 \n 解析出来内容长度为 5,结果检索 tcp 缓冲区中居然有 8 字节的数据多了3个字节值读取 \n 后 5 字节数据形成一个完整的数据包后 length,然后跳过一个字节的分隔符,再次去缓冲区读取 length 个字节,多出来的字节不做处理,就后续 rpc 逻辑调用,剩下的数据再次走从第一步开始的逻辑
4. tcp 缓冲区的数据刚好满足一个数据包在这种情况下 tcp 缓冲区数据不多不少刚刚满足一个数据包的大小
那么什么情况下才会出现无法收到完整数据包的情况呢? 比如 1 和 2
我们知道以太网数据链路层的 MTU 数据包默认为最大 1500 字节,我们的 TCP 数据包内购传递的最大内容 MSS = 1500 - IP 头部 - tcp 头部 = 1460 字节
编码后的单个数据包太大,大于 1500 字节
数据包大于了单词数据链路层传递的最大长度,就会分批发送
接收缓冲区几乎被打满了
TCP 具备拥塞控制功能,当服务器接收缓冲区只剩下 2 字节空间的时候,那么客户端如果一股脑的无脑发数据,数据存储到哪去?所以客户端也只会发送 2 字节,当接收缓冲区为 0 的时候,客户端就会暂停发送
网卡流量几乎被打满了
尽管当前 socket 接收缓冲区还有充足空间比如 2M,但是网卡被其它请求打满了,只有 2 字节的剩余,这个时候我们也只会收到 <= 2 字节的数据包
服务处理速度太慢
比如操作耗时较久没有及时释放空间,导致 TCP 的释放速度跟不上新增速度,这也是导致第三点的原因之一
注:Netty 采用的是边缘触发模式,内核组装完成一个 TCP 数据包后就会告知用户线程可以处理了,这个时候就会拿到不完整的数据
长连接保活机制
当客户端和服务端建立好长连接后,如果一方长时间没有请求那么有可能这个链接已经断开了,这时候为了避免资源浪费是需要释放对应的资源的
系统层面的保活:时效性低,资源释放不及时默认 linux 系统可以通过 keepalive 机制来进行存活的探测,默认 2 小时如果没有收到数据包,就发送一个心跳数据包给对方,如果返回了 ack 表明连接正常,如果超时未收到响应则一共会重试 9 次间隔 75S 最后没有成功的话就释放长连接资源 应用层面的保活:时效性高,资源释放及时在应用层面创建定时任务每个指定时间发送一次心跳,如果心跳失败则进行重试还是关闭连接等操作 public class JdIdleStateHandler extends IdleStateHandler { private static final int READER_IDLE_TIME = 15; public JdIdleStateHandler() { super(READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS); } @Override protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) { System.out.println(READER_IDLE_TIME + "秒内未读到数据,关闭连接"); ctx.channel().close(); }}Channel Pipline
服务端接收到请求后,当做一个 inbound 接入的流处理,按照添加的顺序依次调用 ChannelInboundHandler 进行处理
服务端响应请求的时候,当做一个 outbound 流处理,按照添加的顺序,从尾部到头部执行 ChannelOutboundHandler
在 server 端 handler pipline 又分为了 2 个,一个作为 server 处理建立长连接的 pipline,另外一个作为 socket 读写请求的 pipline,添加方式如下
serverBootstrap.group(boss, worker) .channel(NioServerSocketChannel.class) // 设置接收缓冲区大小 .childOption(ChannelOption.SO_RCVBUF, 32 * 1024) // 设置发送缓冲区大小 .childOption(ChannelOption.SO_SNDBUF, 32 * 1023) // 添加处理建立连接的 pipline .handler(new ChannelInitializer<NioServerSocketChannel>() { @Override protected void initChannel(NioServerSocketChannel ch) throws Exception { ch.pipeline().addLast(new ServerAcceptHandler()); } }) // 添加处理读写事件的 pipline .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline channelPipeline = ch.pipeline(); channelPipeline.addLast(new JdrpcCodec()); channelPipeline.addLast(new JdrpcServerHandler()); } });
这里都是我们自定义的 pipline,除此之外系统还会向其中添加默认的 handler,做一些系统默认的操作,最后 pipline 链条如下
ServerBootstrapAcceptor 就是负责选择一个 NioEventLoop 然后将 NioSocketChannel 绑定到其中,后续 NioEventLoop 线程将 NioSocketChannel 注册到对应的 Selector 中就能感知到读写事件了
池化缓冲区
缓冲区的创建和释放都是有一定成本的,如果能进行池化复用将会提升性能
通过 ByteBufAllocator 来创建和使用池化的 ByteBuf
通过 Unpooled 来申请未此话的 ByteBuf
缓冲区的另外一块意义在于减少用户态到内核态切换以及数据拷贝的次数
如下通过创建 1亿次 1MB 空间使用后释放内存,观察他们的GC耗时
注:该案例和测试来自于 Netty 进阶之路-李林锋
(1)内存池模式 gc 32 次,耗时 37.6ms
(2)非内存池模式 gc 3038 次,耗时 2.35S
图解工作原理
![netty工作原理 (1)]
默认一个 NioEventLoop 线程关联一个 selector,服务端创建 ServerSocketChannel 将其注册到 Selector 中,关注 OP_ACCEPT 事件
Server 的事件由其对应的 handler 来处理,例如我们想要处理在建立连接时候的行为,Server 在启动的时候还会默认为 Server 的 Handler Pipline 添加一个 ServerBootstrapAcceptor handler,这样当底层三次握手完成后会先调用 AcceptServerHandler 然后再调用 ServerBootstrapAcceptor
serverBootstrap.group(boss, worker) .channel(NioServerSocketChannel.class) // 设置接收缓冲区大小 .childOption(ChannelOption.SO_RCVBUF, 32 * 1024) // 设置发送缓冲区大小 .childOption(ChannelOption.SO_SNDBUF, 32 * 1023) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new AcceptServerHandler()); } })
ServerBootstrapAcceptor 的作用取出与客户端建立好连接 NioSocketChannel 然后选择一个 NioEventLoop 将其与之绑定,并且注册到其中的 Selector 中,关注读写事件
当读事件就绪后会交由 NioByteUnsafe 进行处理,读取到数据后交由编解码 handler 进行处理,形成一个完整的数据包后交给 JdrpcServerHandler 业务进行处理,这是有按照 Pipline 添加的 inbound 顺序依次调用
当写数据的时候,会将数据压入队列,flush 的时候统一将数据发送给客户端
IO 模型
linux 下的网络模型netty 如何指定网络模型非阻塞 IO:案列中我们默认使用的是 NioEventLoop 和 NioSocketChannel 这 2 种模式是 netty 进行优化后能够运行在个平台上的,底层是基于非阻塞 IO 实现的
多路复用 IO:替换使用 EpollEventLoop 和 EpollSocketChannel 即可切换为 epoll 模式,但是 epoll 模式只在 linux 下面执行,当能够确定 netty 是运行在 linux 中时候采用 epoll 能够获得更高的性能 |
本帖子中包含更多资源
您需要 登录 才可以下载或查看,没有账号?立即注册
×
|