找回密码
 立即注册
查看: 388|回复: 0

Netty自定义Codec实现ProtoBuf传输多种类型对象

[复制链接]
发表于 2021-10-14 15:10 | 显示全部楼层 |阅读模式
传统方式如下:

加上这个解码器进行解码
pipeline.addLast(new ProtobufDecoder(UserPOJO.User.getDefaultInstance()));
加上下面这个编码器进行编码
pipeline.addLast(new ProtobufEncoder());
但是这样的方式有一个很大的弊端,那就是他并不能解析多种类型的ProtoBuf传输的对象,因为UserPOJO.User.getDefaultInstance()中已经写死了解码器可以解析的对象类型,那么我们该如何去做呢?

可以自定义协议+自定义ProtoBuf编解码器去进行解决

我们先封装一个协议
msg长度的低8bit | msg长度的高8bit | unused | 对象类型 | ...msg...
我们自己封装一个header,其中高两字节代表长度,最低字节代表对象的类型。

我们先来分析一下,2个byte去定义长度,因此可以表示对象的最大长度为65535B,也就是64KB;1byte定义类型,可以有255种类型,暂时勉强够用。通过长度还能判断一下后面传输的对象的长度,还能解决TCP的粘包问题,足够使用。

我们先定义一个常量类
public class MessageProtocolConstants {    public static int BYTES_OF_HEADER = 4;  //header的长度    public static int INDEX_OF_HEADER_LOW_8_BITS = 0;  //在header中低8位所在的索引    public static int INDEX_OF_HEADER_HIGH_8_BITS = 1;  //在header中高8位所在的索引    public static int INDEX_OF_UNUSED = 2;   //在header中尚未使用的bit    public static int INDEX_OF_HEADER_TYPE = 3;  //在header中type所在的索引    public static int MASK_OF_HIGH_8_BITS = 0x00ff;  //从header中提取出来长度的高8Bit    public static int MASK_OF_LOW_8_BITS = 0xff00;  //从header中提取出来长度的低8Bit    public static byte HEADER_OF_TYPE_USER = 0x01;  //User类型的Type枚举值    public static byte HEADER_OF_TYPE_STUDENT = 0x02;  //Student类型的Type枚举值}
下面是自定义的编码器
public class CustomProtoBufEncoder extends MessageToByteEncoder<MessageLite> {    @Override    protected void encode(ChannelHandlerContext ctx, MessageLite msg, ByteBuf out) throws Exception {        byte[] body = msg.toByteArray();  //将消息转成字节数组        byte[] header = generateHeader(msg, (short) body.length);  //根据Type生成header        out.writeBytes(header);  //写header        out.writeBytes(body);  //写body    }    //short占用2Bytes,有16个bit,可以代表的最大的数是65535,因此可以代表最大的长度为64KB,应该足够一个对象的传输了    private byte[] generateHeader(MessageLite msg, short length) throws Exception {        byte typeOfObject;  //要进行传输的对象的类型        if (msg instanceof UserPOJO.User) {            typeOfObject = MessageProtocolConstants.HEADER_OF_TYPE_USER;        } else if (msg instanceof UserPOJO.Student) {            typeOfObject = MessageProtocolConstants.HEADER_OF_TYPE_STUDENT;        } else {            throw new NotSupportedException("不支持" + msg.getClass() + "类型该对象的传输");        }        byte[] header = new byte[MessageProtocolConstants.BYTES_OF_HEADER];        //byte[0..1]表示消息的长度,byte[2]暂时保留不进行使用,byte[3]存放类型        header[MessageProtocolConstants.INDEX_OF_HEADER_LOW_8_BITS] = (byte) (length & MessageProtocolConstants.MASK_OF_LOW_8_BITS);  //低8位长度        header[MessageProtocolConstants.INDEX_OF_HEADER_HIGH_8_BITS] = (byte) (length & MessageProtocolConstants.MASK_OF_HIGH_8_BITS);  //高8位长度        header[MessageProtocolConstants.INDEX_OF_HEADER_TYPE] = typeOfObject;  //对象类型        return header;    }}
下面是自定义的解码器
public class CustomProtoBufDecoder extends ByteToMessageDecoder {    @Override    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {        //如果可读的字节数量还不够头部的长度,那么直接return...        if (in.readableBytes() <= MessageProtocolConstants.BYTES_OF_HEADER) {            return;        }        byte[] header = new byte[MessageProtocolConstants.BYTES_OF_HEADER];        in.readBytes(header); //读取header        int length = (header[MessageProtocolConstants.INDEX_OF_HEADER_HIGH_8_BITS] << 8)                + header[MessageProtocolConstants.INDEX_OF_HEADER_LOW_8_BITS];  //高8bit左移8位+低8bit得到长度        byte type = header[MessageProtocolConstants.INDEX_OF_HEADER_TYPE];  //解析传递的出来type类型        byte[] body = new byte[length]; //创建指定长度的byte[]用来去进行读取body        in.readBytes(body);  //读取body...        MessageLite object = getObject(body, type);  //解析对象        out.add(object);  //加入到结果列表当中    }    private MessageLite getObject(byte[] body, byte type) throws Exception {        if (type == MessageProtocolConstants.HEADER_OF_TYPE_USER) {            return UserPOJO.User.getDefaultInstance().getParserForType().parseFrom(body);        } else if (type == MessageProtocolConstants.HEADER_OF_TYPE_STUDENT) {            return UserPOJO.Student.getDefaultInstance().getParserForType().parseFrom(body);        }        throw new UnsupportedOperationException("不支持解析该类型的对象");    }}
懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

小黑屋|手机版|Unity开发者联盟 ( 粤ICP备20003399号 )

GMT+8, 2024-11-24 18:36 , Processed in 0.063668 second(s), 22 queries .

Powered by Discuz! X3.5 Licensed

© 2001-2024 Discuz! Team.

快速回复 返回顶部 返回列表