找回密码
 立即注册
查看: 225|回复: 0

gRpc 客户端调用服务接口

[复制链接]
发表于 2022-12-1 18:07 | 显示全部楼层 |阅读模式
gRPC 客户端调用服务接口

不依赖protobuf生成的代码文件进行调用,可以通过反射接口进行调用。但需要Server端提供io.grpc.reflection.v1alpha.ServerReflectionGrpc 服务,用于获取服务的描述文件。

大致流程:
    根据方法名称,调用服务端反射服务的方法,获取方法所在proto文件根据proto描述文件,获取文件描述、服务描述,用于重新构建被调用方法的方法描述MethodDescriptor根据方法描述,将请求内容序列化为对应的类型使用重新构建的MethodDescriptor和其他参数对Server端相应的方法发起调用解析响应并返回
实现

1. proto文件定义 hello.proto

syntax = "proto3";option java_multiple_files = true;option java_package = "com.haust.hello";option java_outer_classname = "HelloProto";package com.haust.hello;service HelloService {  rpc SayHello (HelloRequest) returns (HelloReply) {}}message HelloRequest {  string message = 1;}message HelloReply {  string message =  1;}2. 构建反射服务

package com.haust.grpc.myrpc;import com.google.protobuf.*;import com.google.protobuf.util.JsonFormat;import io.grpc.CallOptions;import io.grpc.ManagedChannel;import io.grpc.ManagedChannelBuilder;import io.grpc.MethodDescriptor;import io.grpc.protobuf.ProtoUtils;import io.grpc.reflection.v1alpha.ServerReflectionGrpc;import io.grpc.reflection.v1alpha.ServerReflectionRequest;import io.grpc.reflection.v1alpha.ServerReflectionResponse;import io.grpc.stub.ClientCalls;import io.grpc.stub.StreamObserver;import java.util.List;import java.util.Map;import java.util.Objects;import java.util.concurrent.TimeUnit;import java.util.stream.Collectors;public class MyGrpc {    public static void main(String[] args) throws Exception {        // 请求方法        String methodSymbol = "com.haust.hello.HelloService.SayHello";        // 请求内容        String requestContext = "{\"message\":\"this is request\"}";        // 构建channel        final ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", 9921).usePlaintext().build();        // 使用channel 构建Stub 创建一个新的异步的stub        ServerReflectionGrpc.ServerReflectionStub reflectionStub = ServerReflectionGrpc.newStub(channel);        // 响应观察器        // 需要Server端 加入ProtoReflectionService.newInstance()        StreamObserver<ServerReflectionResponse> streamObserver = new StreamObserver<ServerReflectionResponse>() {            @Override            public void onNext(ServerReflectionResponse serverReflectionResponse) {                // 处理响应                try {                    if (serverReflectionResponse.getMessageResponseCase() == ServerReflectionResponse.MessageResponseCase.FILE_DESCRIPTOR_RESPONSE) {                        List<ByteString> fileDescriptorProtoList = serverReflectionResponse.getFileDescriptorResponse().getFileDescriptorProtoList();                        // requestContent 请求内容                        handleResponse(fileDescriptorProtoList, channel, methodSymbol, requestContext);                    } else {                        System.out.println("响应处理失败" + serverReflectionResponse.getMessageResponseCase());                    }                } catch (Exception e) {                    e.printStackTrace();                }            }            @Override            public void onError(Throwable throwable) {                throwable.printStackTrace();            }            @Override            public void onCompleted() {                System.out.println("completed!");            }        };        // 请求观察器        StreamObserver<ServerReflectionRequest> requestStreamObserver = reflectionStub.serverReflectionInfo(streamObserver);        // 根据方法名称获取文件描述请求        ServerReflectionRequest getFileContainingSymbolRequest = ServerReflectionRequest.newBuilder().setFileContainingSymbol(methodSymbol).build();        requestStreamObserver.onNext(getFileContainingSymbolRequest);        channel.awaitTermination(10, TimeUnit.SECONDS);    }    // 在处理请求时,先解析了包名、服务名和方法名,然后根据包名和服务名,从返回的文件描述中获取到了响应方法所在文件的描述;然后从文件描述中获取服务描述,最终获取到方法描述,根据方法描述执行调用    private static void handleResponse(List<ByteString> fileDescriptorProtoList, ManagedChannel channel, String methodFullName, String requestContent) {        try {            // 解析方法和服务名称            String fullName = extraPrefix(methodFullName);            String methodName = extraSuffix(methodFullName);            String packageName = extraPrefix(fullName);            String serviceName = extraSuffix(fullName);            // 根据响应解析 FileDescriptor            Descriptors.FileDescriptor fileDescriptor = getFileDescriptor(fileDescriptorProtoList, packageName, serviceName);            // 查找服务描述            Descriptors.ServiceDescriptor serviceDescriptor = fileDescriptor.getFile().findServiceByName(serviceName);            // 查找方法描述            Descriptors.MethodDescriptor methodDescriptor = serviceDescriptor.findMethodByName(methodName);            // 发起请求            executeCall(channel, fileDescriptor, methodDescriptor, requestContent);        } catch (Exception e) {            e.printStackTrace();        }    }    // 根据响应找到方法对应的文件的FileDescriptorProto ,然后构建出对应的 FileDescriptor    private static Descriptors.FileDescriptor getFileDescriptor(List<ByteString> fileDescriptorProtoList,                                                                String packageName,                                                                String serviceName) throws Exception {        Map<String, DescriptorProtos.FileDescriptorProto> fileDescriptorProtoMap =                fileDescriptorProtoList.stream()                        .map(bs -> {                            try {                                return DescriptorProtos.FileDescriptorProto.parseFrom(bs);                            } catch (InvalidProtocolBufferException e) {                                e.printStackTrace();                            }                            return null;                        })                        .filter(Objects::nonNull)                        .collect(Collectors.toMap(DescriptorProtos.FileDescriptorProto::getName, f -> f));        if (fileDescriptorProtoMap.isEmpty()) {            System.out.println("服务不存在");            throw new IllegalArgumentException("方法的文件描述不存在");        }        // 查找服务对应的 Proto 描述        DescriptorProtos.FileDescriptorProto fileDescriptorProto = findServiceFileDescriptorProto(packageName, serviceName, fileDescriptorProtoMap);        // 获取这个 Proto 的依赖        Descriptors.FileDescriptor[] dependencies = getDependencies(fileDescriptorProto, fileDescriptorProtoMap);        // 生成 Proto 的 FileDescriptor        return Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, dependencies);    }    // 根据包名查找响应的文件描述    private static DescriptorProtos.FileDescriptorProto findServiceFileDescriptorProto(String packageName, String serviceName, Map<String, DescriptorProtos.FileDescriptorProto> fileDescriptorProtoMap) {        for (DescriptorProtos.FileDescriptorProto proto : fileDescriptorProtoMap.values()) {            if (proto.getPackage().equals(packageName)) {                boolean exist = proto.getServiceList().stream().anyMatch(s -> serviceName.equals(s.getName()));                if (exist) {                    return proto;                }            }        }        throw new IllegalArgumentException("服务不存在");    }    // 获取依赖类型    private static Descriptors.FileDescriptor[] getDependencies(DescriptorProtos.FileDescriptorProto proto, Map<String, DescriptorProtos.FileDescriptorProto> fileDescriptorProtoMap) {        return proto.getDependencyList().stream().map(fileDescriptorProtoMap::get)                .map(f -> toFileDescriptor(f, getDependencies(f, fileDescriptorProtoMap))).toArray(Descriptors.FileDescriptor[]::new);    }    // 将FileDescriptorProto转为Descriptor    private static Descriptors.FileDescriptor toFileDescriptor(DescriptorProtos.FileDescriptorProto fileDescriptorProto, Descriptors.FileDescriptor[] dependencies) {        try {            return Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, dependencies);        } catch (Descriptors.DescriptorValidationException e) {            e.printStackTrace();        }        return null;    }    // 获取前缀    private static String extraPrefix(String context) {        int index = context.lastIndexOf(".");        return context.substring(0, index);    }    // 获取后缀    private static String extraSuffix(String context) {        int index = context.lastIndexOf(".");        return context.substring(index + 1);    }    // 执行方法调用    private static void executeCall(ManagedChannel channel, Descriptors.FileDescriptor fileDescriptor, Descriptors.MethodDescriptor originMethodDescriptor, String requestContext) throws Exception {        // 重新生成MethodDescriptor        MethodDescriptor<DynamicMessage, DynamicMessage> methodDescriptor = generateMethodDescriptor(originMethodDescriptor);        CallOptions callOptions = CallOptions.DEFAULT;        TypeRegistry typeRegistry = TypeRegistry.newBuilder().add(fileDescriptor.getMessageTypes()).build();        // 将请求内容诸位响应的类型        JsonFormat.Parser parser = JsonFormat.parser().usingTypeRegistry(typeRegistry);        DynamicMessage.Builder messageBuilder = DynamicMessage.newBuilder(originMethodDescriptor.getInputType());        parser.merge(requestContext, messageBuilder);        DynamicMessage requestMessage = messageBuilder.build();        // 调用, 调用方可以通过originMethodDescriptor.isClientStreaming()和originMethodDescriptor.isServerStreaming() 推断        DynamicMessage response = ClientCalls.blockingUnaryCall(channel, methodDescriptor, callOptions, requestMessage);        // 将响应解析为JSON字符串        JsonFormat.Printer printer = JsonFormat.printer().usingTypeRegistry(typeRegistry).includingDefaultValueFields();        String responseContent = printer.print(response);        System.out.println("响应 responseContent: " + responseContent);    }    //格式,而需要的是package.service/method格式,同时请求和响应类型也需要重新设置为 DynamicMessage,所以需要重新生成 MethodDescriptor    private static MethodDescriptor<DynamicMessage, DynamicMessage> generateMethodDescriptor(Descriptors.MethodDescriptor originMethodDescriptor) {        // 生成方法全名        String fullMethodName = MethodDescriptor.generateFullMethodName(originMethodDescriptor.getService().getFullName(), originMethodDescriptor.getName());        // 请求和响应类型        MethodDescriptor.Marshaller<DynamicMessage> inputTypeMarshaller = ProtoUtils.marshaller(DynamicMessage.newBuilder(originMethodDescriptor.getInputType())                .buildPartial());        MethodDescriptor.Marshaller<DynamicMessage> outputTypeMarshaller = ProtoUtils.marshaller(DynamicMessage.newBuilder(originMethodDescriptor.getOutputType())                .buildPartial());        // 生成方法描述, originMethodDescriptor 的 fullMethodName 不正确        return MethodDescriptor.<DynamicMessage, DynamicMessage>newBuilder()                .setFullMethodName(fullMethodName)                .setRequestMarshaller(inputTypeMarshaller)                .setResponseMarshaller(outputTypeMarshaller)                // 使用 UNKNOWN,自动修改                .setType(MethodDescriptor.MethodType.UNKNOWN)                .build();    }}Server端 服务实现

package com.haust.grpc.server;import io.grpc.Server;import io.grpc.netty.NettyServerBuilder;import io.grpc.protobuf.services.ProtoReflectionService;import java.util.concurrent.TimeUnit;public class ReflectionServer {    public static void main(String[] args) {        Server server = NettyServerBuilder.forPort(9921).addService(new HelloGrpcImpl())                .addService(ProtoReflectionService.newInstance()).build();        try {            server.start();            Runtime.getRuntime().addShutdownHook(new Thread(() -> {                try {                    server.awaitTermination(10, TimeUnit.SECONDS);                } catch (InterruptedException e) {                    e.printStackTrace();                }            }));            server.awaitTermination();        } catch (Exception e) {            e.printStackTrace();        }    }}当然一般情况下,在公司的grpc服务中,是不可以开启反射的,此情况仅适用于开发环境
懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

小黑屋|手机版|Unity开发者联盟 ( 粤ICP备20003399号 )

GMT+8, 2024-11-24 16:55 , Processed in 0.065088 second(s), 22 queries .

Powered by Discuz! X3.5 Licensed

© 2001-2024 Discuz! Team.

快速回复 返回顶部 返回列表