grpc 初体验
之前很多次看到 grpc ,但一直都机会没有投产使用过,最近机会终于来了,我们项目需要用grpc了,我是对接者,也就是grpc 的客户端,下面就来说说我的流程吧首先我去看了挺多关于protobuf 和grpc 和rpc 相关的知识,就是扫盲吧。
1.编写proto文件,其实这个文件是服务端那边定的,就像http接口一样,基本都是服务端来定义接口参数和格式,因此服务端把他的proto文件给我了,我点开一看,里面有很多东西,其中有很多的部分都是我不需要的,因为他还要给别的服务提供grpc接口,他全部写在一个文件里了(我认为最好的做法是分开,分别给多个客户端他自己的proto文件),虽然他说不用管其他的部分,但是我还是只想要我会用的,因此我删掉了很多
文件基本内容:
定义服务:服务的概念就是一组接口,相当于一个结构体或者类吧,后面要调用接口时会用到这个,在服务里定义几个接口,一般情况下,接口入参是一个接口体,接口出参也是一个结构体
定义服务的时候有一个很重要的标识:stream, 如果 stream 在入参前面则表示客户端流式rpc,stream 在出参前面表示服务端流式rpc,如果两个都有,则表示双向流式rpc,如果都没有表示简单rpc,这个东西对长连接很有用,后面能看到
我们是定义了一个服务端流式rpc
service Linker {
rpc 接口1(SubState) returns (stream ReportData) {}
}
服务必须用service 来定义
2.定义message,也就是入参出参的结构体,包含类型和id
message SubState {
string consumer_group = 2;
string client_id= 3;
}
message ReportData {
string cabinet_no = 1; //依次是字段类型,字段名,id
string topic = 2;
string content = 3;
string id = 4;
}
结构体必须用message 来定义
为什么要定义id的值为数字呢,这是因为 rpc 数据传输并不是使用字段名的,是使用我们定义的这个id值,这样可以提高传输效率,字段名是给我们人看的,id的值直接按顺序写下来就好
3.生成关键代码,因为 protobuf 是不区分编程语言的数据结构,因此需要用专门的工具来生成特定语言的代码,这个就是 protoc,protoc 安装就不赘述了,直接上命令
protoc --go_out=../ --go-grpc_out=../*.proto
第一个选项是生成消息序列化代码,grpc 的客户端和服务端都会使用到这里的代码,不要改
第二个选项是生成 grpc 的代码,包含客户端和服务端的代码
4.调用接口
package mainimport ( service "grpc-c/service")func main() { service.ConsumerData()}func ConsumerData() error { defer func() {//防止首次连接时 panic if r := recover(); r != nil { fmt.Println("grpc 连接错误!!!") time.Sleep(3 * time.Second) fmt.Println("休息一下,马上连接...") ConsumerData() } }() conn, err := grpc.Dial("localhost:6666", grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Println(err.Error()) return err } defer conn.Close() //服务端流模式 c := pb.NewLinkerClient(conn) ctx, _ := context.WithCancel(context.Background()) stream, err := c.SubCabMessage(ctx, &pb.SubState{ConsumerGroup: "111", ClientId: "222"}) if err != nil { log.Println("订阅grpc err", err.Error()) } for { a, err := stream.Recv() if err == io.EOF { return nil } if err != nil { if strings.Contains(err.Error(), "Unavailable") { //防止运行过程中,服务端断线引起的 panic,则客户端重连 ConsumerData() } continue } log.Printf("handleMessage:%+v,处理完毕", a) }}
第一步:建立连接
第二步:创建客户端
第三步:调用接口
第四步:循环获取数据
如果不是服务端流式 rpc 就调用一次就能获取结果,我们这个是服务端流式rpc ,所以得循环获取,实现了一次连接,服务端多次推送数据的效果
关键的关键,实现了客户端断线重连
仔细看一下返回的数据,都是之前proto 文件里定义好的结构体
[图片上传中...(image.png-bbf33-1678095393319-0)]
仔细看一下 *_grpc.pb.go grpc 文件代码,就能发现很多东西,比如客户端怎么从接口获取数据
grpc 客户端就这么完成了
grpc 服务端端
package mainimport ( "fmt" "net" pb "rpc/pb" "rpc/service" "google.golang.org/grpc")func main() { server := grpc.NewServer() pb.RegisterLinkerServer(server, service.LinkerServer{}) lis, err := net.Listen("tcp", ":6666") if err != nil { fmt.Println("err:", err.Error()) } go service.LocalCall() //模拟服务端发送数据时刻,比如说由http接口触发grpc服务端发数据给grpc客户端 err = server.Serve(lis) if err != nil { fmt.Println("err:", err.Error()) }}package serviceimport ( "fmt" "log" pb "rpc/pb" "time")type LinkerServer struct { pb.UnimplementedLinkerServer}var reschan = make(chan *pb.ReportData)func (s LinkerServer) SubCabMessage(sub_state *pb.SubState, stream pb.Linker_SubCabMessageServer) error { for data := range reschan { err := stream.Send(data) if err != nil { log.Println("发送grpc err", err.Error()) return err } } return nil}func LocalCall() { n := 0 for { fmt.Println("n:", n) server := pb.ReportData{ CabinetNo: fmt.Sprintf("chenjuan%d", n), Topic: "test", Content: "{111}", Id: "iddddd", } reschan <- &server time.Sleep(5 * time.Second) n++ }}这个流程是由另一个http接口触发给grpc客户端发数据,并不是grpc客户端发起,由于grpc接口无法由本地调用,因此我暂时只能将要发送的数据通过channel 发送给SubCabMessage这个接口
页:
[1]