|
所谓的协议,是由语法、语义、时序这三个要素组成的一种规范,通信双方按照该协议规范来实现网络数据传输,这样通信双方才能实现数据正常通信和解析。
由于不同的中间件在功能方面有一定差异,所以其实应该是没有一种标准化协议来满足不同差异化需求,因此很多中间件都会定义自己的通信协议,另外通信协议可以解决粘包和拆包问题。
在本篇文章中,我们来实现一个自定义消息协议。
自定义协议的要素
自定义协议,那这个协议必须要有组成的元素,
魔数: 用来判断数据包的有效性版本号: 可以支持协议升级序列化算法: 消息正文采用什么样的序列化和反序列化方式,比如json、protobuf、hessian等指令类型:也就是当前发送的是一个什么类型的消息,像zookeeper中,它传递了一个Type请求序号: 基于双工协议,提供异步能力,也就是收到的异步消息需要找到前面的通信请求进行响应处理消息长度消息正文
协议定义
sessionId | reqType | Content-Length | Content |
其中Version,Content-Length,SessionId就是Header信息,Content就是交互的主体。
定义项目结构以及引入包
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId></dependency><dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId></dependency><dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId></dependency>
项目结构如图4-1所示:
netty-message-mic : 表示协议模块。netty-message-server :表示nettyserver。
<center>图4-1</center>
在nettyMessage-mic中,包的结构如下。
image-20210831103346370
定义Header
表示消息头
@Datapublic class Header{ private long sessionId; //会话id : 占8个字节 private byte type; //消息类型: 占1个字节 private int length; //消息长度 : 占4个字节}定义MessageRecord
表示消息体
@Datapublic class MessageRecord{ private Header header; private Object body;}OpCode
定义操作类型
public enum OpCode { BUSI_REQ((byte)0), BUSI_RESP((byte)1), PING((byte)3), PONG((byte)4); private byte code; private OpCode(byte code) { this.code=code; } public byte code(){ return this.code; }}定义编解码器
分别定义对该消息协议的编解码器
MessageRecordEncoder
@Slf4jpublic class MessageRecordEncoder extends MessageToByteEncoder<MessageRecord> { @Override protected void encode(ChannelHandlerContext channelHandlerContext, MessageRecord record, ByteBuf byteBuf) throws Exception { log.info("===========开始编码Header部分==========="); Header header=record.getHeader(); byteBuf.writeLong(header.getSessionId()); //保存8个字节的sessionId byteBuf.writeByte(header.getType()); //写入1个字节的请求类型 log.info("===========开始编码Body部分==========="); Object body=record.getBody(); if(body!=null){ ByteArrayOutputStream bos=new ByteArrayOutputStream(); ObjectOutputStream oos=new ObjectOutputStream(bos); oos.writeObject(body); byte[] bytes=bos.toByteArray(); byteBuf.writeInt(bytes.length); //写入消息体长度:占4个字节 byteBuf.writeBytes(bytes); //写入消息体内容 }else{ byteBuf.writeInt(0); //写入消息长度占4个字节,长度为0 } }}MessageRecordDecode
@Slf4jpublic class MessageRecordDecode extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { MessageRecord record=new MessageRecord(); Header header=new Header(); header.setSessionId(byteBuf.readLong()); //读取8个字节的sessionid header.setType(byteBuf.readByte()); //读取一个字节的操作类型 record.setHeader(header); //如果byteBuf剩下的长度还有大于4个字节,说明body不为空 if(byteBuf.readableBytes()>4){ int length=byteBuf.readInt(); //读取四个字节的长度 header.setLength(length); byte[] contents=new byte[length]; byteBuf.readBytes(contents,0,length); ByteArrayInputStream bis=new ByteArrayInputStream(contents); ObjectInputStream ois=new ObjectInputStream(bis); record.setBody(ois.readObject()); list.add(record); log.info("序列化出来的结果:"+record); }else{ log.error("消息内容为空"); } }}测试协议的解析和编码
EmbeddedChannel是netty专门改进针对ChannelHandler的单元测试而提供的
public class CodesMainTest { public static void main( String[] args ) throws Exception { EmbeddedChannel channel=new EmbeddedChannel( new LoggingHandler(), new MessageRecordEncoder(), new MessageRecordDecode()); Header header=new Header(); header.setSessionId(123456); header.setType(OpCode.PING.code()); MessageRecord record=new MessageRecord(); record.setBody("Hello World"); record.setHeader(header); channel.writeOutbound(record); ByteBuf buf= ByteBufAllocator.DEFAULT.buffer(); new MessageRecordEncoder().encode(null,record,buf); channel.writeInbound(buf); }}编码包分析
运行上述代码后,会得到下面的一个信息
+-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f |+--------+-------------------------------------------------+----------------+|00000000| 00 00 00 00 00 01 e2 40 03 00 00 00 12 ac ed 00 |.......@........||00000010| 05 74 00 0b 48 65 6c 6c 6f 20 57 6f 72 6c 64 |.t..Hello World |+--------+-------------------------------------------------+----------------+
按照协议规范:
前面8个字节表示sessionId一个字节表示请求类型4个字节表示长度后面部分内容表示消息体
测试粘包和半包问题
通过slice方法进行拆分,得到两个包。ByteBuf中提供了一个slice方法,这个方法可以在不做数据拷贝的情况下对原始ByteBuf进行拆分。 public class CodesMainTest { public static void main( String[] args ) throws Exception { //EmbeddedChannel是netty专门针对ChannelHandler的单元测试而提供的类。可以通过这个类来测试channel输入入站和出站的实现 EmbeddedChannel channel=new EmbeddedChannel( //解决粘包和半包问题// new LengthFieldBasedFrameDecoder(2048,10,4,0,0), new LoggingHandler(), new MessageRecordEncoder(), new MessageRecordDecode()); Header header=new Header(); header.setSessionId(123456); header.setType(OpCode.PING.code()); MessageRecord record=new MessageRecord(); record.setBody("Hello World"); record.setHeader(header); channel.writeOutbound(record); ByteBuf buf= ByteBufAllocator.DEFAULT.buffer(); new MessageRecordEncoder().encode(null,record,buf); //*********模拟半包和粘包问题************// //把一个包通过slice拆分成两个部分 ByteBuf bb1=buf.slice(0,7); //获取前面7个字节 ByteBuf bb2=buf.slice(7,buf.readableBytes()-7); //获取后面的字节 bb1.retain(); channel.writeInbound(bb1); channel.writeInbound(bb2); }}
运行上述代码会得到如下异常, readerIndex(0) +length(8)表示要读取8个字节,但是只收到7个字节,所以直接报错。
2021-08-31 15:53:01,385 [io.netty.handler.logging.LoggingHandler]-[DEBUG] [id: 0xembedded, L:embedded - R:embedded] READ: 7B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f |+--------+-------------------------------------------------+----------------+|00000000| 00 00 00 00 00 01 e2 |....... |+--------+-------------------------------------------------+----------------+2021-08-31 15:53:01,397 [io.netty.handler.logging.LoggingHandler]-[DEBUG] [id: 0xembedded, L:embedded - R:embedded] READ COMPLETEException in thread "main" io.netty.handler.codec.DecoderException: java.lang.IndexOutOfBoundsException: readerIndex(0) + length(8) exceeds writerIndex(7): UnpooledSlicedByteBuf(ridx: 0, widx: 7, cap: 7/7, unwrapped: PooledUnsafeDirectByteBuf(ridx: 0, widx: 31, cap: 256))解决拆包问题
LengthFieldBasedFrameDecoder是长度域解码器,它是解决拆包粘包最常用的解码器,基本上能覆盖大部分基于长度拆包的场景。其中开源的消息中间件RocketMQ就是使用该解码器进行解码的。
首先来说明一下该解码器的核心参数
lengthFieldOffset,长度字段的偏移量,也就是存放长度数据的起始位置lengthFieldLength,长度字段锁占用的字节数lengthAdjustment,在一些较为复杂的协议设计中,长度域不仅仅包含消息的长度,还包含其他数据比如版本号、数据类型、数据状态等,这个时候我们可以使用lengthAdjustment进行修正,它的值=包体的长度值-长度域的值initialBytesToStrip,解码后需要跳过的初始字节数,也就是消息内容字段的起始位置lengthFieldEndOffset,长度字段结束的偏移量, 该属性的值=lengthFieldOffset+lengthFieldLength
public class CodesMainTest { public static void main( String[] args ) throws Exception { EmbeddedChannel channel=new EmbeddedChannel( //解决粘包和半包问题 new LengthFieldBasedFrameDecoder(1024, 9,4,0,0), new LoggingHandler(), new MessageRecordEncoder(), new MessageRecordDecode()); Header header=new Header(); header.setSessionId(123456); header.setType(OpCode.PING.code()); MessageRecord record=new MessageRecord(); record.setBody("Hello World"); record.setHeader(header); channel.writeOutbound(record); ByteBuf buf= ByteBufAllocator.DEFAULT.buffer(); new MessageRecordEncoder().encode(null,record,buf); //*********模拟半包和粘包问题************// //把一个包通过slice拆分成两个部分 ByteBuf bb1=buf.slice(0,7); ByteBuf bb2=buf.slice(7,buf.readableBytes()-7); bb1.retain(); channel.writeInbound(bb1); channel.writeInbound(bb2); }}
添加一个长度解码器,就解决了拆包带来的问题。运行结果如下
2021-08-31 16:09:35,115 [com.netty.example.codec.MessageRecordDecode]-[INFO] 序列化出来的结果:MessageRecord(header=Header(sessionId=123456, type=3, length=18), body=Hello World)2021-08-31 16:09:35,116 [io.netty.handler.logging.LoggingHandler]-[DEBUG] [id: 0xembedded, L:embedded - R:embedded] READ COMPLETE基于自定义消息协议通信
下面我们把整个通信过程编写完整,代码结构如图4-2所示.
image-20210831175056500
<center>图4-2</center>
服务端开发
@Slf4jpublic class ProtocolServer { public static void main(String[] args){ EventLoopGroup boss = new NioEventLoopGroup(); //2 用于对接受客户端连接读写操作的线程工作组 EventLoopGroup work = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(boss, work) //绑定两个工作线程组 .channel(NioServerSocketChannel.class) //设置NIO的模式 // 初始化绑定服务通道 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline() .addLast( new LengthFieldBasedFrameDecoder(1024, 9,4,0,0)) .addLast(new MessageRecordEncoder()) .addLast(new MessageRecordDecode()) .addLast(new ServerHandler()); } }); ChannelFuture cf= null; try { cf = b.bind(8080).sync(); log.info("ProtocolServer start success"); cf.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally { work.shutdownGracefully(); boss.shutdownGracefully(); } }}ServerHandler
@Slf4jpublic class ServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { MessageRecord messageRecord=(MessageRecord)msg; log.info("server receive message:"+messageRecord); MessageRecord res=new MessageRecord(); Header header=new Header(); header.setSessionId(messageRecord.getHeader().getSessionId()); header.setType(OpCode.BUSI_RESP.code()); String message="Server Response Message!"; res.setBody(message); header.setLength(message.length()); ctx.writeAndFlush(res); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("服务器读取数据异常"); super.exceptionCaught(ctx, cause); ctx.close(); }}客户端开发
public class ProtocolClient { public static void main(String[] args) { //创建工作线程组 EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 9,4,0,0)) .addLast(new MessageRecordEncoder()) .addLast(new MessageRecordDecode()) .addLast(new ClientHandler()); } }); // 发起异步连接操作 try { ChannelFuture future = b.connect(new InetSocketAddress("localhost", 8080)).sync(); Channel c = future.channel(); for (int i = 0; i < 500; i++) { MessageRecord message = new MessageRecord(); Header header = new Header(); header.setSessionId(10001); header.setType((byte) OpCode.BUSI_REQ.code()); message.setHeader(header); String context="我是请求数据"+i; header.setLength(context.length()); message.setBody(context); c.writeAndFlush(message); } //closeFuture().sync()就是让当前线程(即主线程)同步等待Netty server的close事件,Netty server的channel close后,主线程才会继续往下执行。closeFuture()在channel close的时候会通知当前线程。 future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally { group.shutdownGracefully(); } }}ClientHandler
@Slf4jpublic class ClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { MessageRecord record=(MessageRecord)msg; log.info("Client Receive message:"+record); super.channelRead(ctx, msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); ctx.close(); }} |
本帖子中包含更多资源
您需要 登录 才可以下载或查看,没有账号?立即注册
×
|