|
衔接上一篇。
这是我的rust版server。
基础支持,conn.rs:
use bytes::{Buf,BytesMut};
use std::io::{self,Cursor};
use tokio::io::{AsyncReadExt,AsyncWriteExt,ReadHalf,WriteHalf,BufWriter};
use tokio::net::TcpStream;
const HEAD_LEN: u32 = 4;
#[derive(Debug)]
enum MyParse<T> {
InComplete,
Got(T),
}
#[derive(Debug)]
pub struct ConnWriter {
writer: BufWriter<WriteHalf<TcpStream>>,
}
impl ConnWriter {
pub fn new(wr: WriteHalf<TcpStream>) -> Self {
ConnWriter {
writer: BufWriter::<WriteHalf<TcpStream>>::new(wr),
}
}
pub async fn write_message(&mut self,vec_bytes: Vec<u8>) -> io::Result<()> {
self.writer.write_u32(vec_bytes.len() as u32).await.unwrap();
self.writer.write(&vec_bytes).await.unwrap();
self.writer.flush().await.unwrap();
Ok(())
}
}
#[derive(Debug)]
pub struct ConnReader {
reader: ReadHalf<TcpStream>,
buffer: BytesMut,
}
impl ConnReader {
pub fn new(rd: ReadHalf<TcpStream>) -> ConnReader {
ConnReader {
reader: rd,
buffer: BytesMut::with_capacity(4*1024),
}
}
pub async fn read_message(&mut self) -> crate::Result<Option<Vec<u8>>> {
loop {
match self.parse_message() {
Ok(MyParse::Got(vec_bytes)) => return Ok(Some(vec_bytes)),
Ok(MyParse::InComplete) => {}, //we need more data!
Err(err) => return Err(err.into()),
}
//when exception,return Err(error).
if 0 == self.reader.read_buf(&mut self.buffer).await? {
if self.buffer.is_empty() {
return Ok(None);
} else {
return Err(&#34;read 0 bytes and connection reset by peer!&#34;.into());
}
}
}
}
fn parse_message(&mut self) -> crate::Result<MyParse<Vec<u8>>> {
let mut cursor = Cursor::new(&self.buffer[..]);
if cursor.remaining() > HEAD_LEN as usize {
let body_len: u32 = cursor.get_u32();
if cursor.remaining() >= body_len as usize {
let body_bytes = cursor.copy_to_bytes(body_len as usize);
self.buffer.advance((HEAD_LEN + body_len) as usize);
Ok(MyParse::Got(body_bytes.to_vec()))
} else {
Ok(MyParse::InComplete)
}
} else {
Ok(MyParse::InComplete)
}
}
}
pub fn get_reader_writer(stream: TcpStream) -> (ConnReader,ConnWriter) {
let (rd,wr) = tokio::io::split(stream);
let conn_reader = ConnReader::new(rd);
let conn_writer = ConnWriter::new(wr);
(conn_reader,conn_writer)
}
主入口main.rs:
pub mod conn;
// Include the `items` module, which is generated from items.proto.
pub mod orange_msgs {
include!(&#34;orange.domain.rs&#34;);
}
use tokio::net::TcpListener;
use tokio::sync::mpsc::{self,Sender};
use std::io::Cursor;
use prost::Message;
use crate::conn::ConnReader;
//use orange_msgs::msg::Type as msgtype;
use orange_msgs::msg::Body as msgbody;
pub type Error = Box<dyn std::error::Error + Send + Sync>;
pub type Result<T> = std::result::Result<T,Error>;
pub fn serialize_msg(msg: &orange_msgs::Msg) -> Vec<u8> {
let mut buf = Vec::new();
buf.reserve(msg.encoded_len());
// Unwrap is safe, since we have reserved sufficient capacity in the vector.
msg.encode(&mut buf).unwrap();
buf
}
pub fn deserialize_msg(buf: &Vec<u8>) -> Result<orange_msgs::Msg> {
match orange_msgs::Msg::decode(&mut Cursor::new(buf)) {
Ok(msg) => Ok(msg),
Err(e) => Err(Box::new(e)),
}
}
#[tokio::main]
async fn main(){
start_server().await
}
async fn start_server() {
let listener = TcpListener::bind(&#34;127.0.0.1:5678&#34;).await.unwrap();
loop {
let (stream,_) = listener.accept().await.unwrap();
let (conn_reader,mut conn_writer) = conn::get_reader_writer(stream);
//the date type transfer in channel is &#39;Msg&#39;
let (write_chan_sender,mut write_chan_receiver) = mpsc::channel::<orange_msgs::Msg>(10);
//reading
tokio::spawn(async move {
process_reading(conn_reader,write_chan_sender).await;
});
//writing
tokio::spawn(async move {
while let Some(msg) = write_chan_receiver.recv().await {
let vec_bytes: Vec<u8> = serialize_msg(&msg);
conn_writer.write_message(vec_bytes).await.unwrap();
}
});
}
}
async fn process_reading(mut conn_reader: ConnReader,write_chan_sender: Sender<orange_msgs::Msg>) {
loop {
match conn_reader.read_message().await {
//got a message
Ok(Some(vec_bytes)) => {
let msg = deserialize_msg(&vec_bytes).unwrap();
println!(&#34;@server: get message is : {:?}&#34;,msg);
let msg_copy = msg.clone();
/* message process here */
match msg.body.unwrap() {
msgbody::Login(login) => println!(&#34;login = {:?}&#34;,login),
msgbody::LoginOk(loginok) => println!(&#34;login ok = {:?}&#34;,loginok),
}
write_chan_sender.send(msg_copy).await.unwrap();
},
//conn is closed!
Ok(None) => {
println!(&#34;conn closed!&#34;);
return;
},
//line broken!
Err(error) => {
println!(&#34;read_message error: {:?}&#34;,error);
return;
}
}
}
}
好像很简单哈,看看build.rs:
fn main() {
let mut config = prost_build::Config::new();
config
.out_dir(&#34;src&#34;)
.compile_protos(&[&#34;src/orange_msgs.proto&#34;], &[&#34;.&#34;])
.unwrap();
}
cargo 以来包:
[dependencies]
tokio = { version = &#34;1&#34;, features = [&#34;full&#34;] }
bytes = &#34;1&#34;
prost = &#34;0.11.6&#34;
[build-dependencies]
prost-build = &#34;0.11.6&#34;
最简单的.proto文件:
syntax = &#34;proto3&#34;;
package orange.domain;
option java_outer_classname = &#34;OrangeMsgs&#34;;
message User {
string name=1;
string level=2;
}
message Login {
string name = 1;
string password = 2;
}
message LoginOk {
User user = 1;
}
//=========================================================
message Msg{
oneof body {
Login login = 1;
LoginOk loginOk = 2;
}
}
附带还得生成java的bat:
protoc --java_out=..\greentree\src\main\java src\orange_msgs.proto
好吧,大概这么多。
====================================
现在说说用rust的tokio之感受。
1,这里最大的问题就是std&#39;s api 和 tokio api的割裂——有种太子总想抢父皇皇位的感觉。
你用tokio的时候,时刻总得注意:我靠,我现在用的是谁的api?一个不注意就用了父皇的。
从概念上讲,tokio的范围显然是更大的——同步只是异步的一种特殊情况!!!
所以,你永远都可以只用tokio搞定任何问题,对吧?
当然了,这可能也没那么严重,毕竟用好use就行了。
2,还有一个问题,就是为什么tokio可以写的这么强?这么简洁的async/await。
我在睡不着的时候大概想了一下,如何实现。
我看他们应该是用宏实现的,使用了部分编译器的能力。async/await代码其实就是可以随时停住的代码。见到await要能停的住,还要在适当的时候继续。这就需要把要待执行的源码包好了,再分配。这种能力只有在编译器层面才做的到。而rust恰恰有这种宏,可以作为编译器的插件,做些事情。
3,最后一个问题,为什么java不能实现async/await?
参考问题2,因为java 没有宏!!!java的反射api也只是用最笨的方法载入类,java没有在编译器层面做事情的能力。所以,java才出现了netty这种玩意,这照tokio差远了。所以,才有了akka这种玩意,和golang的模型类似,用消息传输解决锁的问题。这东西和tokio比也不行。为什么呢?因为tokio比他们的实现更直接,且没损耗。
4,再附加一题,为什么不用C++?
我已经没精力学c++了。
==========================================
由于使用protobuf,客户端也变了。
这是scala3版的简单client:
package orange
import java.io.IOException
import java.nio.channels.SocketChannel
import com.google.protobuf.InvalidProtocolBufferException
import orange.conn.Conn
import orange.domain.OrangeMsgs
class Connection(socketChannel: SocketChannel):
val conn = Conn(socketChannel)
def readMessage(): Either[Exception,Option[OrangeMsgs.Msg]] =
conn.readMsg() match
case Right(Some(bytes)) =>
try
//here,the decode exception should catch
Right(Some(OrangeMsgs.Msg.parseFrom(bytes)))
catch
case pbe: InvalidProtocolBufferException => Left(pbe)
case Right(None) => Right(None)
case Left(ioe) => Left(ioe)
def writeMessage(msg: OrangeMsgs.Msg): Either[IOException,Unit] =
conn.writeMsg(msg.toByteArray())
def close(): Either[IOException,Unit] =
try
conn.socketChannel.close()
Right(())
catch
case ioe: IOException => Left(ioe)
入口:
package orange
import java.net.InetSocketAddress
import java.nio.channels.SocketChannel
import orange.domain.OrangeMsgs
object Client:
val addr = &#34;127.0.0.1&#34;
val port = 5678
def main(args: Array[String]): Unit =
println(&#34;i am client...&#34;)
start()
def start(): Unit =
val socketChannel = SocketChannel.open()
socketChannel.connect(InetSocketAddress(addr,port))
val connection = Connection(socketChannel)
val login = OrangeMsgs.Login.newBuilder().setName(&#34;wen&#34;).setPassword(&#34;123&#34;)
val msg = OrangeMsgs.Msg.newBuilder().setLogin(login).build()
connection.writeMessage(msg)
connection.readMessage() match
case Right(Some(msg)) =>
println(&#34;client got message:&#34;+msg)
import OrangeMsgs.Msg.BodyCase
msg.getBodyCase() match
case BodyCase.LOGIN =>
val mylogin = msg.getLogin()
println(s&#34;client mylogin:$mylogin&#34;)
case BodyCase.LOGINOK =>
val myloginok = msg.getLoginOk()
println(s&#34;client myloginok:$myloginok&#34;)
case BodyCase.BODY_NOT_SET => println(&#34;***body not set!***&#34;)
case Right(None) => println(&#34;line broken!&#34;)
case Left(e) => println(&#34;error happened:&#34;+e)
connection.close()
conn就还是使用前面的那个,一样的。
需要注意的是,protobuf的api,比我原来想像的要简单,用过后自有感受。 |
|