找回密码
 立即注册
查看: 227|回复: 0

rust 版 server,use protobuf

[复制链接]
发表于 2023-2-1 13:06 | 显示全部楼层 |阅读模式
衔接上一篇。
这是我的rust版server。
基础支持,conn.rs:
use bytes::{Buf,BytesMut};
use std::io::{self,Cursor};
use tokio::io::{AsyncReadExt,AsyncWriteExt,ReadHalf,WriteHalf,BufWriter};
use tokio::net::TcpStream;

const HEAD_LEN: u32 = 4;

#[derive(Debug)]
enum MyParse<T> {
    InComplete,
    Got(T),
}

#[derive(Debug)]
pub struct ConnWriter {
    writer: BufWriter<WriteHalf<TcpStream>>,
}

impl ConnWriter {
    pub fn new(wr: WriteHalf<TcpStream>) -> Self {
        ConnWriter {
            writer: BufWriter::<WriteHalf<TcpStream>>::new(wr),
        }
    }
   
    pub async fn write_message(&mut self,vec_bytes: Vec<u8>) -> io::Result<()> {
        self.writer.write_u32(vec_bytes.len() as u32).await.unwrap();
        self.writer.write(&vec_bytes).await.unwrap();
        self.writer.flush().await.unwrap();
        Ok(())
    }
}

#[derive(Debug)]
pub struct ConnReader {
    reader: ReadHalf<TcpStream>,
    buffer: BytesMut,
}

impl ConnReader {
    pub fn new(rd: ReadHalf<TcpStream>) -> ConnReader {
        ConnReader {
            reader: rd,                                                                                          
            buffer: BytesMut::with_capacity(4*1024),
        }
    }
   
    pub async fn read_message(&mut self) -> crate::Result<Option<Vec<u8>>> {
        loop {
            match self.parse_message() {
                Ok(MyParse::Got(vec_bytes)) => return Ok(Some(vec_bytes)),
                Ok(MyParse::InComplete) => {}, //we need more data!
                Err(err) => return Err(err.into()),
            }
            
            //when exception,return Err(error).
            if 0 == self.reader.read_buf(&mut self.buffer).await? {
                if self.buffer.is_empty() {
                    return Ok(None);
                } else {
                    return Err("read 0 bytes and connection reset by peer!".into());
                }                                                                                                         
            }
        }
    }

    fn parse_message(&mut self) -> crate::Result<MyParse<Vec<u8>>> {
        let mut cursor = Cursor::new(&self.buffer[..]);
        if cursor.remaining() > HEAD_LEN as usize {
            let body_len: u32 = cursor.get_u32();
            if cursor.remaining() >= body_len as usize {
                let body_bytes = cursor.copy_to_bytes(body_len as usize);
                self.buffer.advance((HEAD_LEN + body_len) as usize);
                Ok(MyParse::Got(body_bytes.to_vec()))
            } else {
                Ok(MyParse::InComplete)
            }
        } else {
            Ok(MyParse::InComplete)
        }
    }
   
}

pub fn get_reader_writer(stream: TcpStream) -> (ConnReader,ConnWriter) {
    let (rd,wr) = tokio::io::split(stream);
    let conn_reader = ConnReader::new(rd);
    let conn_writer = ConnWriter::new(wr);
    (conn_reader,conn_writer)
}

主入口main.rs:
pub mod conn;
// Include the `items` module, which is generated from items.proto.
pub mod orange_msgs {
    include!("orange.domain.rs");
}

use tokio::net::TcpListener;
use tokio::sync::mpsc::{self,Sender};
use std::io::Cursor;
use prost::Message;
use crate::conn::ConnReader;
//use orange_msgs::msg::Type as msgtype;
use orange_msgs::msg::Body as msgbody;

pub type Error = Box<dyn std::error::Error + Send + Sync>;
pub type Result<T> = std::result::Result<T,Error>;

pub fn serialize_msg(msg: &orange_msgs::Msg) -> Vec<u8> {
    let mut buf = Vec::new();
    buf.reserve(msg.encoded_len());
    // Unwrap is safe, since we have reserved sufficient capacity in the vector.
    msg.encode(&mut buf).unwrap();
    buf
}

pub fn deserialize_msg(buf: &Vec<u8>) -> Result<orange_msgs::Msg> {
    match orange_msgs::Msg::decode(&mut Cursor::new(buf)) {
        Ok(msg) => Ok(msg),
        Err(e) => Err(Box::new(e)),
    }
}

#[tokio::main]
async fn main(){
    start_server().await
}

async fn start_server() {
    let listener = TcpListener::bind("127.0.0.1:5678").await.unwrap();
   
    loop {
        let (stream,_) = listener.accept().await.unwrap();
        let (conn_reader,mut conn_writer) = conn::get_reader_writer(stream);

        //the date type transfer in channel is 'Msg'
        let (write_chan_sender,mut write_chan_receiver) = mpsc::channel::<orange_msgs::Msg>(10);
        
        //reading
        tokio::spawn(async move {
            process_reading(conn_reader,write_chan_sender).await;
        });

        //writing
        tokio::spawn(async move {
            while let Some(msg) = write_chan_receiver.recv().await {
                let vec_bytes: Vec<u8> = serialize_msg(&msg);
                conn_writer.write_message(vec_bytes).await.unwrap();
            }
        });
    }
}

async fn process_reading(mut conn_reader: ConnReader,write_chan_sender: Sender<orange_msgs::Msg>) {

    loop {
        match conn_reader.read_message().await {
            //got a message
            Ok(Some(vec_bytes)) => {
                let msg = deserialize_msg(&vec_bytes).unwrap();
                println!("@server: get message is : {:?}",msg);
                let msg_copy = msg.clone();
                /* message process here */
                match msg.body.unwrap() {
                    msgbody::Login(login) => println!("login = {:?}",login),
                    msgbody::LoginOk(loginok) => println!("login ok = {:?}",loginok),
                }
                write_chan_sender.send(msg_copy).await.unwrap();
            },
            
            //conn is closed!
            Ok(None) => {
                println!("conn closed!");
                return;
            },
            
            //line broken!
            Err(error) => {
                println!("read_message error: {:?}",error);
                return;
            }
        }
    }
}

好像很简单哈,看看build.rs:
fn main() {
    let mut config = prost_build::Config::new();
    config
        .out_dir("src")
        .compile_protos(&["src/orange_msgs.proto"], &["."])
        .unwrap();
}
cargo 以来包:
[dependencies]
tokio = { version = "1", features = ["full"] }
bytes = "1"
prost = "0.11.6"

[build-dependencies]
prost-build = "0.11.6"

最简单的.proto文件:
syntax = "proto3";

package orange.domain;

option java_outer_classname = "OrangeMsgs";

message User {
        string name=1;
        string level=2;
}

message Login {
    string name = 1;
    string password = 2;
}

message LoginOk {
        User user = 1;
}

//=========================================================

message Msg{
    oneof body {
        Login login = 1;
        LoginOk loginOk = 2;
    }
}
附带还得生成java的bat:
protoc --java_out=..\greentree\src\main\java src\orange_msgs.proto
好吧,大概这么多。
====================================
现在说说用rust的tokio之感受。
1,这里最大的问题就是std's api 和 tokio api的割裂——有种太子总想抢父皇皇位的感觉。
你用tokio的时候,时刻总得注意:我靠,我现在用的是谁的api?一个不注意就用了父皇的。
从概念上讲,tokio的范围显然是更大的——同步只是异步的一种特殊情况!!!
所以,你永远都可以只用tokio搞定任何问题,对吧?
当然了,这可能也没那么严重,毕竟用好use就行了。
2,还有一个问题,就是为什么tokio可以写的这么强?这么简洁的async/await。
我在睡不着的时候大概想了一下,如何实现。
我看他们应该是用宏实现的,使用了部分编译器的能力。async/await代码其实就是可以随时停住的代码。见到await要能停的住,还要在适当的时候继续。这就需要把要待执行的源码包好了,再分配。这种能力只有在编译器层面才做的到。而rust恰恰有这种宏,可以作为编译器的插件,做些事情。
3,最后一个问题,为什么java不能实现async/await?
参考问题2,因为java 没有宏!!!java的反射api也只是用最笨的方法载入类,java没有在编译器层面做事情的能力。所以,java才出现了netty这种玩意,这照tokio差远了。所以,才有了akka这种玩意,和golang的模型类似,用消息传输解决锁的问题。这东西和tokio比也不行。为什么呢?因为tokio比他们的实现更直接,且没损耗。
4,再附加一题,为什么不用C++?
我已经没精力学c++了。
==========================================
由于使用protobuf,客户端也变了。
这是scala3版的简单client:
package orange

import java.io.IOException
import java.nio.channels.SocketChannel
import com.google.protobuf.InvalidProtocolBufferException
import orange.conn.Conn
import orange.domain.OrangeMsgs

class Connection(socketChannel: SocketChannel):
    val conn = Conn(socketChannel)

    def readMessage(): Either[Exception,Option[OrangeMsgs.Msg]] =
        conn.readMsg() match
            case Right(Some(bytes)) =>
                try
                    //here,the decode exception should catch
                    Right(Some(OrangeMsgs.Msg.parseFrom(bytes)))
                catch
                    case pbe: InvalidProtocolBufferException => Left(pbe)
            case Right(None) => Right(None)
            case Left(ioe) => Left(ioe)

    def writeMessage(msg: OrangeMsgs.Msg): Either[IOException,Unit] =
        conn.writeMsg(msg.toByteArray())

    def close(): Either[IOException,Unit] =
        try
            conn.socketChannel.close()
            Right(())
        catch
            case ioe: IOException => Left(ioe)

入口:
package orange

import java.net.InetSocketAddress
import java.nio.channels.SocketChannel
import orange.domain.OrangeMsgs

object Client:
    val addr = "127.0.0.1"
    val port = 5678

    def main(args: Array[String]): Unit =
        println("i am client...")
        start()
   
    def start(): Unit =
        val socketChannel = SocketChannel.open()
        socketChannel.connect(InetSocketAddress(addr,port))

        val connection = Connection(socketChannel)
        val login = OrangeMsgs.Login.newBuilder().setName("wen").setPassword("123")
        val msg = OrangeMsgs.Msg.newBuilder().setLogin(login).build()

        connection.writeMessage(msg)

        connection.readMessage() match
            case Right(Some(msg)) =>
                println("client got message:"+msg)
                import OrangeMsgs.Msg.BodyCase
                msg.getBodyCase() match
                    case BodyCase.LOGIN =>
                        val mylogin = msg.getLogin()
                        println(s"client mylogin:$mylogin")
                    case BodyCase.LOGINOK =>
                        val myloginok = msg.getLoginOk()
                        println(s"client myloginok:$myloginok")
                    case BodyCase.BODY_NOT_SET => println("***body not set!***")

            case Right(None) => println("line broken!")
            case Left(e) => println("error happened:"+e)

        connection.close()

conn就还是使用前面的那个,一样的。
需要注意的是,protobuf的api,比我原来想像的要简单,用过后自有感受。
懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

小黑屋|手机版|Unity开发者联盟 ( 粤ICP备20003399号 )

GMT+8, 2024-11-24 09:23 , Processed in 0.090984 second(s), 25 queries .

Powered by Discuz! X3.5 Licensed

© 2001-2024 Discuz! Team.

快速回复 返回顶部 返回列表