super1 发表于 2023-2-1 13:06

rust 版 server,use protobuf

衔接上一篇。
这是我的rust版server。
基础支持,conn.rs:
use bytes::{Buf,BytesMut};
use std::io::{self,Cursor};
use tokio::io::{AsyncReadExt,AsyncWriteExt,ReadHalf,WriteHalf,BufWriter};
use tokio::net::TcpStream;

const HEAD_LEN: u32 = 4;

#
enum MyParse<T> {
    InComplete,
    Got(T),
}

#
pub struct ConnWriter {
    writer: BufWriter<WriteHalf<TcpStream>>,
}

impl ConnWriter {
    pub fn new(wr: WriteHalf<TcpStream>) -> Self {
      ConnWriter {
            writer: BufWriter::<WriteHalf<TcpStream>>::new(wr),
      }
    }
   
    pub async fn write_message(&mut self,vec_bytes: Vec<u8>) -> io::Result<()> {
      self.writer.write_u32(vec_bytes.len() as u32).await.unwrap();
      self.writer.write(&vec_bytes).await.unwrap();
      self.writer.flush().await.unwrap();
      Ok(())
    }
}

#
pub struct ConnReader {
    reader: ReadHalf<TcpStream>,
    buffer: BytesMut,
}

impl ConnReader {
    pub fn new(rd: ReadHalf<TcpStream>) -> ConnReader {
      ConnReader {
            reader: rd,                                                                                          
            buffer: BytesMut::with_capacity(4*1024),
      }
    }
   
    pub async fn read_message(&mut self) -> crate::Result<Option<Vec<u8>>> {
      loop {
            match self.parse_message() {
                Ok(MyParse::Got(vec_bytes)) => return Ok(Some(vec_bytes)),
                Ok(MyParse::InComplete) => {}, //we need more data!
                Err(err) => return Err(err.into()),
            }
            
            //when exception,return Err(error).
            if 0 == self.reader.read_buf(&mut self.buffer).await? {
                if self.buffer.is_empty() {
                  return Ok(None);
                } else {
                  return Err("read 0 bytes and connection reset by peer!".into());
                }                                                                                                         
            }
      }
    }

    fn parse_message(&mut self) -> crate::Result<MyParse<Vec<u8>>> {
      let mut cursor = Cursor::new(&self.buffer[..]);
      if cursor.remaining() > HEAD_LEN as usize {
            let body_len: u32 = cursor.get_u32();
            if cursor.remaining() >= body_len as usize {
                let body_bytes = cursor.copy_to_bytes(body_len as usize);
                self.buffer.advance((HEAD_LEN + body_len) as usize);
                Ok(MyParse::Got(body_bytes.to_vec()))
            } else {
                Ok(MyParse::InComplete)
            }
      } else {
            Ok(MyParse::InComplete)
      }
    }
   
}

pub fn get_reader_writer(stream: TcpStream) -> (ConnReader,ConnWriter) {
    let (rd,wr) = tokio::io::split(stream);
    let conn_reader = ConnReader::new(rd);
    let conn_writer = ConnWriter::new(wr);
    (conn_reader,conn_writer)
}

主入口main.rs:
pub mod conn;
// Include the `items` module, which is generated from items.proto.
pub mod orange_msgs {
    include!("orange.domain.rs");
}

use tokio::net::TcpListener;
use tokio::sync::mpsc::{self,Sender};
use std::io::Cursor;
use prost::Message;
use crate::conn::ConnReader;
//use orange_msgs::msg::Type as msgtype;
use orange_msgs::msg::Body as msgbody;

pub type Error = Box<dyn std::error::Error + Send + Sync>;
pub type Result<T> = std::result::Result<T,Error>;

pub fn serialize_msg(msg: &orange_msgs::Msg) -> Vec<u8> {
    let mut buf = Vec::new();
    buf.reserve(msg.encoded_len());
    // Unwrap is safe, since we have reserved sufficient capacity in the vector.
    msg.encode(&mut buf).unwrap();
    buf
}

pub fn deserialize_msg(buf: &Vec<u8>) -> Result<orange_msgs::Msg> {
    match orange_msgs::Msg::decode(&mut Cursor::new(buf)) {
      Ok(msg) => Ok(msg),
      Err(e) => Err(Box::new(e)),
    }
}

#
async fn main(){
    start_server().await
}

async fn start_server() {
    let listener = TcpListener::bind("127.0.0.1:5678").await.unwrap();
   
    loop {
      let (stream,_) = listener.accept().await.unwrap();
      let (conn_reader,mut conn_writer) = conn::get_reader_writer(stream);

      //the date type transfer in channel is 'Msg'
      let (write_chan_sender,mut write_chan_receiver) = mpsc::channel::<orange_msgs::Msg>(10);
      
      //reading
      tokio::spawn(async move {
            process_reading(conn_reader,write_chan_sender).await;
      });

      //writing
      tokio::spawn(async move {
            while let Some(msg) = write_chan_receiver.recv().await {
                let vec_bytes: Vec<u8> = serialize_msg(&msg);
                conn_writer.write_message(vec_bytes).await.unwrap();
            }
      });
    }
}

async fn process_reading(mut conn_reader: ConnReader,write_chan_sender: Sender<orange_msgs::Msg>) {

    loop {
      match conn_reader.read_message().await {
            //got a message
            Ok(Some(vec_bytes)) => {
                let msg = deserialize_msg(&vec_bytes).unwrap();
                println!("@server: get message is : {:?}",msg);
                let msg_copy = msg.clone();
                /* message process here */
                match msg.body.unwrap() {
                  msgbody::Login(login) => println!("login = {:?}",login),
                  msgbody::LoginOk(loginok) => println!("login ok = {:?}",loginok),
                }
                write_chan_sender.send(msg_copy).await.unwrap();
            },
            
            //conn is closed!
            Ok(None) => {
                println!("conn closed!");
                return;
            },
            
            //line broken!
            Err(error) => {
                println!("read_message error: {:?}",error);
                return;
            }
      }
    }
}

好像很简单哈,看看build.rs:
fn main() {
    let mut config = prost_build::Config::new();
    config
      .out_dir("src")
      .compile_protos(&["src/orange_msgs.proto"], &["."])
      .unwrap();
}
cargo 以来包:

tokio = { version = "1", features = ["full"] }
bytes = "1"
prost = "0.11.6"


prost-build = "0.11.6"

最简单的.proto文件:
syntax = "proto3";

package orange.domain;

option java_outer_classname = "OrangeMsgs";

message User {
        string name=1;
        string level=2;
}

message Login {
    string name = 1;
    string password = 2;
}

message LoginOk {
        User user = 1;
}

//=========================================================

message Msg{
    oneof body {
      Login login = 1;
      LoginOk loginOk = 2;
    }
}
附带还得生成java的bat:
protoc --java_out=..\greentree\src\main\java src\orange_msgs.proto
好吧,大概这么多。
====================================
现在说说用rust的tokio之感受。
1,这里最大的问题就是std's api 和 tokio api的割裂——有种太子总想抢父皇皇位的感觉。
你用tokio的时候,时刻总得注意:我靠,我现在用的是谁的api?一个不注意就用了父皇的。
从概念上讲,tokio的范围显然是更大的——同步只是异步的一种特殊情况!!!
所以,你永远都可以只用tokio搞定任何问题,对吧?
当然了,这可能也没那么严重,毕竟用好use就行了。
2,还有一个问题,就是为什么tokio可以写的这么强?这么简洁的async/await。
我在睡不着的时候大概想了一下,如何实现。
我看他们应该是用宏实现的,使用了部分编译器的能力。async/await代码其实就是可以随时停住的代码。见到await要能停的住,还要在适当的时候继续。这就需要把要待执行的源码包好了,再分配。这种能力只有在编译器层面才做的到。而rust恰恰有这种宏,可以作为编译器的插件,做些事情。
3,最后一个问题,为什么java不能实现async/await?
参考问题2,因为java 没有宏!!!java的反射api也只是用最笨的方法载入类,java没有在编译器层面做事情的能力。所以,java才出现了netty这种玩意,这照tokio差远了。所以,才有了akka这种玩意,和golang的模型类似,用消息传输解决锁的问题。这东西和tokio比也不行。为什么呢?因为tokio比他们的实现更直接,且没损耗。
4,再附加一题,为什么不用C++?
我已经没精力学c++了。
==========================================
由于使用protobuf,客户端也变了。
这是scala3版的简单client:
package orange

import java.io.IOException
import java.nio.channels.SocketChannel
import com.google.protobuf.InvalidProtocolBufferException
import orange.conn.Conn
import orange.domain.OrangeMsgs

class Connection(socketChannel: SocketChannel):
    val conn = Conn(socketChannel)

    def readMessage(): Either] =
      conn.readMsg() match
            case Right(Some(bytes)) =>
                try
                  //here,the decode exception should catch
                  Right(Some(OrangeMsgs.Msg.parseFrom(bytes)))
                catch
                  case pbe: InvalidProtocolBufferException => Left(pbe)
            case Right(None) => Right(None)
            case Left(ioe) => Left(ioe)

    def writeMessage(msg: OrangeMsgs.Msg): Either =
      conn.writeMsg(msg.toByteArray())

    def close(): Either =
      try
            conn.socketChannel.close()
            Right(())
      catch
            case ioe: IOException => Left(ioe)

入口:
package orange

import java.net.InetSocketAddress
import java.nio.channels.SocketChannel
import orange.domain.OrangeMsgs

object Client:
    val addr = "127.0.0.1"
    val port = 5678

    def main(args: Array): Unit =
      println("i am client...")
      start()
   
    def start(): Unit =
      val socketChannel = SocketChannel.open()
      socketChannel.connect(InetSocketAddress(addr,port))

      val connection = Connection(socketChannel)
      val login = OrangeMsgs.Login.newBuilder().setName("wen").setPassword("123")
      val msg = OrangeMsgs.Msg.newBuilder().setLogin(login).build()

      connection.writeMessage(msg)

      connection.readMessage() match
            case Right(Some(msg)) =>
                println("client got message:"+msg)
                import OrangeMsgs.Msg.BodyCase
                msg.getBodyCase() match
                  case BodyCase.LOGIN =>
                        val mylogin = msg.getLogin()
                        println(s"client mylogin:$mylogin")
                  case BodyCase.LOGINOK =>
                        val myloginok = msg.getLoginOk()
                        println(s"client myloginok:$myloginok")
                  case BodyCase.BODY_NOT_SET => println("***body not set!***")

            case Right(None) => println("line broken!")
            case Left(e) => println("error happened:"+e)

      connection.close()

conn就还是使用前面的那个,一样的。
需要注意的是,protobuf的api,比我原来想像的要简单,用过后自有感受。
页: [1]
查看完整版本: rust 版 server,use protobuf