rustum 发表于 2022-7-16 17:25

Netty编解码-Demo

package com.tuling.netty.codec;import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.serialization.ObjectEncoder;public class NettyClient {    public static void main(String[] args) throws Exception {      EventLoopGroup group = new NioEventLoopGroup();      try {            Bootstrap bootstrap = new Bootstrap();            bootstrap.group(group).channel(NioSocketChannel.class)                  .handler(new ChannelInitializer<SocketChannel>() {                        @Override                        protected void initChannel(SocketChannel ch) throws Exception {                            ChannelPipeline pipeline = ch.pipeline();                            //pipeline.addLast(new StringEncoder());                            pipeline.addLast(new ObjectEncoder());                            pipeline.addLast(new NettyClientHandler());                        }                  });            System.out.println("netty client start。。");            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9000).sync();            channelFuture.channel().closeFuture().sync();      } finally {            group.shutdownGracefully();      }    }}package com.tuling.netty.codec;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;public class NettyClientHandler extends ChannelInboundHandlerAdapter {    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {      System.out.println("收到服务器消息:" + msg);    }    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {      System.out.println("MyClientHandler发送数据");      //ctx.writeAndFlush("测试String编解码");      //测试对象编解码      //ctx.writeAndFlush(new User(1,"zhuge"));      //测试用protostuff对对象编解码      ByteBuf buf = Unpooled.copiedBuffer(ProtostuffUtil.serializer(new User(1, "zhuge")));      ctx.writeAndFlush(buf);    }}package com.tuling.netty.codec;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;public class NettyServer {    public static void main(String[] args) throws Exception {      EventLoopGroup bossGroup = new NioEventLoopGroup(1);      EventLoopGroup workerGroup = new NioEventLoopGroup();      try {            ServerBootstrap serverBootstrap = new ServerBootstrap();            serverBootstrap.group(bossGroup, workerGroup)                  .channel(NioServerSocketChannel.class)                  .childHandler(new ChannelInitializer<SocketChannel>() {                        @Override                        protected void initChannel(SocketChannel ch) throws Exception {                            ChannelPipeline pipeline = ch.pipeline();                            //pipeline.addLast(new StringDecoder());                            //pipeline.addLast(new ObjectDecoder(10240, ClassResolvers.cacheDisabled(null)));                            pipeline.addLast(new NettyServerHandler());                        }                  });            System.out.println("netty server start。。");            ChannelFuture channelFuture = serverBootstrap.bind(9000).sync();            channelFuture.channel().closeFuture().sync();      } finally {            bossGroup.shutdownGracefully();            workerGroup.shutdownGracefully();      }    }}package com.tuling.netty.codec;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;public class NettyServerHandler extends ChannelInboundHandlerAdapter {    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {      //System.out.println("从客户端读取到String:" + msg.toString());      //System.out.println("从客户端读取到Object:" + ((User)msg).toString());         //测试用protostuff对对象编解码      ByteBuf buf = (ByteBuf) msg;      byte[] bytes = new byte;      buf.readBytes(bytes);      System.out.println("从客户端读取到Object:" + ProtostuffUtil.deserializer(bytes, User.class));    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {      cause.printStackTrace();      ctx.close();    }}package com.tuling.netty.codec;import com.dyuproject.protostuff.LinkedBuffer;import com.dyuproject.protostuff.ProtostuffIOUtil;import com.dyuproject.protostuff.Schema;import com.dyuproject.protostuff.runtime.RuntimeSchema;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;/** * protostuff 序列化工具类,基于protobuf封装 */public class ProtostuffUtil {    private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<Class<?>, Schema<?>>();    private static <T> Schema<T> getSchema(Class<T> clazz) {      @SuppressWarnings("unchecked")      Schema<T> schema = (Schema<T>) cachedSchema.get(clazz);      if (schema == null) {            schema = RuntimeSchema.getSchema(clazz);            if (schema != null) {                cachedSchema.put(clazz, schema);            }      }      return schema;    }    /**   * 序列化   *   * @param obj   * @return   */    public static <T> byte[] serializer(T obj) {      @SuppressWarnings("unchecked")      Class<T> clazz = (Class<T>) obj.getClass();      LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);      try {            Schema<T> schema = getSchema(clazz);            return ProtostuffIOUtil.toByteArray(obj, schema, buffer);      } catch (Exception e) {            throw new IllegalStateException(e.getMessage(), e);      } finally {            buffer.clear();      }    }    /**   * 反序列化   *   * @param data   * @param clazz   * @return   */    public static <T> T deserializer(byte[] data, Class<T> clazz) {      try {            T obj = clazz.newInstance();            Schema<T> schema = getSchema(clazz);            ProtostuffIOUtil.mergeFrom(data, obj, schema);            return obj;      } catch (Exception e) {            throw new IllegalStateException(e.getMessage(), e);      }    }    public static void main(String[] args) {      byte[] userBytes = ProtostuffUtil.serializer(new User(1, "zhuge"));      User user = ProtostuffUtil.deserializer(userBytes, User.class);      System.out.println(user);    }}package com.tuling.netty.codec;import java.io.Serializable;public class User implements Serializable {                private int id;        private String name;        public User(){}        public User(int id, String name) {                super();                this.id = id;                this.name = name;        }        public int getId() {                return id;        }        public void setId(int id) {                this.id = id;        }        public String getName() {                return name;        }        public void setName(String name) {                this.name = name;        }        @Override        public String toString() {                return "User{" +                                "id=" + id +                                ", name='" + name + '\'' +                                '}';        }}
页: [1]
查看完整版本: Netty编解码-Demo