|
用过Netty的人明显表现对它的偏爱,有没有?!
为什么要用netty再实现一遍?
上一篇已经实现了串口通信。当然,简单实现还是远远不够的。即使是串口通信,也需要对收发数据进行编解码处理吧?也需要保证数据的完整性吧?也需要协议吧?也需要把业务逻辑的部分单独处理吧?
下面上代码:
1.编解码类;
import java.util.List;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.ByteToMessageDecoder;/** * 对收到的数据进行解码 * @author 程就人生 * @Date */public class ByteArrayDecoder extends ByteToMessageDecoder{ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { // 标记一下当前的readIndex的位置 in.markReaderIndex(); int dataLength = in.readableBytes(); byte[] array = new byte[dataLength]; in.readBytes(array, 0, dataLength); if(array.length > 0){ out.add(array); } }}import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.MessageToByteEncoder;import lombok.extern.slf4j.Slf4j;/** * 对发出的数据进行编码 * @author 程就人生 * @Date */@Slf4jpublic class ByteArrayEncoder extends MessageToByteEncoder<byte[]>{ @Override protected void encode(ChannelHandlerContext ctx, byte[] msg, ByteBuf out) throws Exception { log.info(".....经过ByteArrayEncoder编码....."); //消息体,包含我们要发送的数据 out.writeBytes(msg); }}
2.数据接收处理类;
import org.springframework.stereotype.Service;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.util.ReferenceCountUtil;import io.netty.channel.ChannelHandler;/** * 串口接收数据处理器 * @author 程就人生 * @Date */@Service("rxtxHandler")@ChannelHandler.Sharablepublic class RxtxHandler extends SimpleChannelInboundHandler<byte[]>{ @Override protected void channelRead0(ChannelHandlerContext ctx, byte[] msg) throws Exception { //文本方式编解码,String //System.out.println("接收到:"+msg); // 十六进制发送编解码 int dataLength = msg.length; ByteBuf buf = Unpooled.buffer(dataLength); buf.writeBytes(msg); System.out.println("接收到:"); while(buf.isReadable()){ System.out.print(" " + buf.readByte()); } System.out.println(""); // 释放资源 ReferenceCountUtil.release(msg); }}
3.串口参数配置类;
import io.netty.channel.rxtx.RxtxChannelConfig;import io.netty.channel.rxtx.RxtxChannelConfig.Databits;import io.netty.channel.rxtx.RxtxChannelConfig.Paritybit;import io.netty.channel.rxtx.RxtxChannelConfig.Stopbits;import lombok.Data;/** * 串口参数配置类 * @author * @date 2022年4月26日 * */@Datapublic final class SerialPortParam { /** * 串口名称,以COM开头(COM0、COM1、COM2等等) */ private String serialPortName; /** * 波特率, 默认:115200 */ private int baudRate = 115200; /** * 数据位 默认8位 * 可以设置的值:SerialPort.DATABITS_5、SerialPort.DATABITS_6、SerialPort.DATABITS_7、SerialPort.DATABITS_8 */ private Databits dataBits = RxtxChannelConfig.Databits.DATABITS_8; /** * 停止位 * 可以设置的值:SerialPort.STOPBITS_1、SerialPort.STOPBITS_2、SerialPort.STOPBITS_1_5 */ private Stopbits stopBits = RxtxChannelConfig.Stopbits.STOPBITS_1; /** * 校验位 * 可以设置的值:SerialPort.PARITY_NONE、SerialPort.PARITY_ODD、SerialPort.PARITY_EVEN、SerialPort.PARITY_MARK、SerialPort.PARITY_SPACE */ private Paritybit parity = RxtxChannelConfig.Paritybit.NONE;}
4.netty整合串口的启动类;
import java.util.concurrent.CompletableFuture;import java.util.concurrent.Executors;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import io.netty.bootstrap.Bootstrap;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoop;import io.netty.channel.EventLoopGroup;import io.netty.channel.oio.OioEventLoopGroup;import io.netty.channel.rxtx.RxtxChannel;import io.netty.channel.rxtx.RxtxDeviceAddress;import io.netty.util.concurrent.GenericFutureListener;import lombok.Data;import lombok.extern.slf4j.Slf4j;/** * 串口接收数据的服务端 * @author 程就人生 * @Date */@Slf4j@Data@Componentpublic class RxtxServer { private RxtxChannel channel; private SerialPortParam serialPortParam; @Autowired private RxtxHandler handler; public void createRxtx() throws Exception { // 串口使用阻塞io EventLoopGroup group = new OioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channelFactory(() -> { RxtxChannel rxtxChannel = new RxtxChannel(); rxtxChannel.config() .setBaudrate(serialPortParam.getBaudRate()) // 波特率 .setDatabits(serialPortParam.getDataBits()) // 数据位 .setParitybit(serialPortParam.getParity()) // 校验位 .setStopbits(serialPortParam.getStopBits()); // 停止位 return rxtxChannel ; }) .handler(new ChannelInitializer<RxtxChannel>() { @Override protected void initChannel(RxtxChannel rxtxChannel) { rxtxChannel.pipeline().addLast(// new LineBasedFrameDecoder(60000), // 文本形式发送编解码// new StringEncoder(StandardCharsets.UTF_8),// new StringDecoder(StandardCharsets.UTF_8), // 十六进制形式发送编解码 new ByteArrayDecoder(), new ByteArrayEncoder(), handler ); } }); ChannelFuture f = bootstrap.connect(new RxtxDeviceAddress(serialPortParam.getSerialPortName())).sync(); f.addListener(connectedListener); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } } // 连接监听 GenericFutureListener<ChannelFuture> connectedListener = (ChannelFuture f) -> { final EventLoop eventLoop = f.channel().eventLoop(); if (!f.isSuccess()) { log.info("连接失败"); }else{ channel = (RxtxChannel) f.channel(); log.info("连接成功"); sendData(); } }; /** * 发送数据 */ public void sendData(){ // 十六机制形式发送 ByteBuf buf = Unpooled.buffer(2); buf.writeByte(3); buf.writeByte(2); channel.writeAndFlush(buf.array()); // 文本形式发送 //channel.writeAndFlush("2"); } public void start(){ CompletableFuture.runAsync(()->{ try { // 阻塞的函数 createRxtx(); } catch (Exception e) { e.printStackTrace(); } }, Executors.newSingleThreadExecutor());//不传默认使用ForkJoinPool,都是守护线程 }}
串口连接成功后,发送一次数据。
5.把串口启动类放在入口程序里,实际业务中根据实际情况调整;
@SpringBootApplicationpublic class SpringBootSqliteApplication { public static void main(String[] args) { //获取application的上下文 ApplicationContext applicationContext = SpringApplication.run(SpringBootSqliteApplication.class, args); // 串口连接服务类 RxtxServer rxtxServer = applicationContext.getBean(RxtxServer.class); SerialPortParam serialPort = new SerialPortParam(); // 连接串口com1 serialPort.setSerialPortName("COM1"); rxtxServer.setSerialPortParam(serialPort); rxtxServer.start(); try { // 连接串口需要一点时间,这里稍微等待一下 Thread.currentThread().sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } // 发送数据 rxtxServer.sendData(); }}
这里又发送了一次数据,所以一共发送了两次数据。
6.运行项目;
在运行项目前,把串口调试工具设置一下,发送数据采用Hex,接收数据也是Hex;如果是String类型,则可以去掉这项勾选;并且设置数据5s中发送一次;
image.png
在控制台,可以看到接收到的数据;
image.png
在串口调试工具这里,又接收到了两组数据。
image.png
最后总结
以上便是netty关于串口采用字节数组的形式通讯的编码。整个编码流程和TCP协议通讯的流程一样。但是,Netty关于串口的编程,相关类相关方法已经过时了。为什么过时了,还有没有什么更好的架包来替代,这是在生产中需要持续关注的。参考资料:
串口调试工具下载:
http://www.zlmcu.com/document/com_debug_tools.html
http://www.zlmcu.com/download.htm
虚拟串口工具下载:
https://www.iotplat.top/#/iot/soft/604ed56de1457928489bf826
相关文档:
Java串口编程RXTX,距离硬件又近了一点
SpringBoot 2 整合 Netty 实现基于 DTU 的 TCP 服务器 之 服务端
SpringBoot 2 整合 Netty 实现基于 DTU 的 TCP 服务器 之 客户端
Netty中的编解码,至少知道这两种
Netty 之 IdleStateHandler 心跳检测(部分源码分析),实现超时断开连接
Netty 之 ByteBuf 几种分配方案 及 内存溢出相关Bug
Netty整合HTTP协议,实现文件目录服务器
Netty整合JBoss Marshalling编解码
Netty整合Protobuf编解码,并解决半包问题
Netty整合MessagePack、LengthFieldBasedFrameDecoder解决粘包/拆包问题
Netty中粘包拆包,小试牛刀
SpringBoot整合Netty与websocket客户端进行对话 |
本帖子中包含更多资源
您需要 登录 才可以下载或查看,没有账号?立即注册
×
|