Arzie100 发表于 2022-12-1 18:07

gRpc 客户端调用服务接口

gRPC 客户端调用服务接口

不依赖protobuf生成的代码文件进行调用,可以通过反射接口进行调用。但需要Server端提供io.grpc.reflection.v1alpha.ServerReflectionGrpc 服务,用于获取服务的描述文件。

大致流程:
根据方法名称,调用服务端反射服务的方法,获取方法所在proto文件根据proto描述文件,获取文件描述、服务描述,用于重新构建被调用方法的方法描述MethodDescriptor根据方法描述,将请求内容序列化为对应的类型使用重新构建的MethodDescriptor和其他参数对Server端相应的方法发起调用解析响应并返回
实现

1. proto文件定义 hello.proto

syntax = "proto3";option java_multiple_files = true;option java_package = "com.haust.hello";option java_outer_classname = "HelloProto";package com.haust.hello;service HelloService {rpc SayHello (HelloRequest) returns (HelloReply) {}}message HelloRequest {string message = 1;}message HelloReply {string message =1;}2. 构建反射服务

package com.haust.grpc.myrpc;import com.google.protobuf.*;import com.google.protobuf.util.JsonFormat;import io.grpc.CallOptions;import io.grpc.ManagedChannel;import io.grpc.ManagedChannelBuilder;import io.grpc.MethodDescriptor;import io.grpc.protobuf.ProtoUtils;import io.grpc.reflection.v1alpha.ServerReflectionGrpc;import io.grpc.reflection.v1alpha.ServerReflectionRequest;import io.grpc.reflection.v1alpha.ServerReflectionResponse;import io.grpc.stub.ClientCalls;import io.grpc.stub.StreamObserver;import java.util.List;import java.util.Map;import java.util.Objects;import java.util.concurrent.TimeUnit;import java.util.stream.Collectors;public class MyGrpc {    public static void main(String[] args) throws Exception {      // 请求方法      String methodSymbol = "com.haust.hello.HelloService.SayHello";      // 请求内容      String requestContext = "{\"message\":\"this is request\"}";      // 构建channel      final ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", 9921).usePlaintext().build();      // 使用channel 构建Stub 创建一个新的异步的stub      ServerReflectionGrpc.ServerReflectionStub reflectionStub = ServerReflectionGrpc.newStub(channel);      // 响应观察器      // 需要Server端 加入ProtoReflectionService.newInstance()      StreamObserver<ServerReflectionResponse> streamObserver = new StreamObserver<ServerReflectionResponse>() {            @Override            public void onNext(ServerReflectionResponse serverReflectionResponse) {                // 处理响应                try {                  if (serverReflectionResponse.getMessageResponseCase() == ServerReflectionResponse.MessageResponseCase.FILE_DESCRIPTOR_RESPONSE) {                        List<ByteString> fileDescriptorProtoList = serverReflectionResponse.getFileDescriptorResponse().getFileDescriptorProtoList();                        // requestContent 请求内容                        handleResponse(fileDescriptorProtoList, channel, methodSymbol, requestContext);                  } else {                        System.out.println("响应处理失败" + serverReflectionResponse.getMessageResponseCase());                  }                } catch (Exception e) {                  e.printStackTrace();                }            }            @Override            public void onError(Throwable throwable) {                throwable.printStackTrace();            }            @Override            public void onCompleted() {                System.out.println("completed!");            }      };      // 请求观察器      StreamObserver<ServerReflectionRequest> requestStreamObserver = reflectionStub.serverReflectionInfo(streamObserver);      // 根据方法名称获取文件描述请求      ServerReflectionRequest getFileContainingSymbolRequest = ServerReflectionRequest.newBuilder().setFileContainingSymbol(methodSymbol).build();      requestStreamObserver.onNext(getFileContainingSymbolRequest);      channel.awaitTermination(10, TimeUnit.SECONDS);    }    // 在处理请求时,先解析了包名、服务名和方法名,然后根据包名和服务名,从返回的文件描述中获取到了响应方法所在文件的描述;然后从文件描述中获取服务描述,最终获取到方法描述,根据方法描述执行调用    private static void handleResponse(List<ByteString> fileDescriptorProtoList, ManagedChannel channel, String methodFullName, String requestContent) {      try {            // 解析方法和服务名称            String fullName = extraPrefix(methodFullName);            String methodName = extraSuffix(methodFullName);            String packageName = extraPrefix(fullName);            String serviceName = extraSuffix(fullName);            // 根据响应解析 FileDescriptor            Descriptors.FileDescriptor fileDescriptor = getFileDescriptor(fileDescriptorProtoList, packageName, serviceName);            // 查找服务描述            Descriptors.ServiceDescriptor serviceDescriptor = fileDescriptor.getFile().findServiceByName(serviceName);            // 查找方法描述            Descriptors.MethodDescriptor methodDescriptor = serviceDescriptor.findMethodByName(methodName);            // 发起请求            executeCall(channel, fileDescriptor, methodDescriptor, requestContent);      } catch (Exception e) {            e.printStackTrace();      }    }    // 根据响应找到方法对应的文件的FileDescriptorProto ,然后构建出对应的 FileDescriptor    private static Descriptors.FileDescriptor getFileDescriptor(List<ByteString> fileDescriptorProtoList,                                                                String packageName,                                                                String serviceName) throws Exception {      Map<String, DescriptorProtos.FileDescriptorProto> fileDescriptorProtoMap =                fileDescriptorProtoList.stream()                        .map(bs -> {                            try {                              return DescriptorProtos.FileDescriptorProto.parseFrom(bs);                            } catch (InvalidProtocolBufferException e) {                              e.printStackTrace();                            }                            return null;                        })                        .filter(Objects::nonNull)                        .collect(Collectors.toMap(DescriptorProtos.FileDescriptorProto::getName, f -> f));      if (fileDescriptorProtoMap.isEmpty()) {            System.out.println("服务不存在");            throw new IllegalArgumentException("方法的文件描述不存在");      }      // 查找服务对应的 Proto 描述      DescriptorProtos.FileDescriptorProto fileDescriptorProto = findServiceFileDescriptorProto(packageName, serviceName, fileDescriptorProtoMap);      // 获取这个 Proto 的依赖      Descriptors.FileDescriptor[] dependencies = getDependencies(fileDescriptorProto, fileDescriptorProtoMap);      // 生成 Proto 的 FileDescriptor      return Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, dependencies);    }    // 根据包名查找响应的文件描述    private static DescriptorProtos.FileDescriptorProto findServiceFileDescriptorProto(String packageName, String serviceName, Map<String, DescriptorProtos.FileDescriptorProto> fileDescriptorProtoMap) {      for (DescriptorProtos.FileDescriptorProto proto : fileDescriptorProtoMap.values()) {            if (proto.getPackage().equals(packageName)) {                boolean exist = proto.getServiceList().stream().anyMatch(s -> serviceName.equals(s.getName()));                if (exist) {                  return proto;                }            }      }      throw new IllegalArgumentException("服务不存在");    }    // 获取依赖类型    private static Descriptors.FileDescriptor[] getDependencies(DescriptorProtos.FileDescriptorProto proto, Map<String, DescriptorProtos.FileDescriptorProto> fileDescriptorProtoMap) {      return proto.getDependencyList().stream().map(fileDescriptorProtoMap::get)                .map(f -> toFileDescriptor(f, getDependencies(f, fileDescriptorProtoMap))).toArray(Descriptors.FileDescriptor[]::new);    }    // 将FileDescriptorProto转为Descriptor    private static Descriptors.FileDescriptor toFileDescriptor(DescriptorProtos.FileDescriptorProto fileDescriptorProto, Descriptors.FileDescriptor[] dependencies) {      try {            return Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, dependencies);      } catch (Descriptors.DescriptorValidationException e) {            e.printStackTrace();      }      return null;    }    // 获取前缀    private static String extraPrefix(String context) {      int index = context.lastIndexOf(".");      return context.substring(0, index);    }    // 获取后缀    private static String extraSuffix(String context) {      int index = context.lastIndexOf(".");      return context.substring(index + 1);    }    // 执行方法调用    private static void executeCall(ManagedChannel channel, Descriptors.FileDescriptor fileDescriptor, Descriptors.MethodDescriptor originMethodDescriptor, String requestContext) throws Exception {      // 重新生成MethodDescriptor      MethodDescriptor<DynamicMessage, DynamicMessage> methodDescriptor = generateMethodDescriptor(originMethodDescriptor);      CallOptions callOptions = CallOptions.DEFAULT;      TypeRegistry typeRegistry = TypeRegistry.newBuilder().add(fileDescriptor.getMessageTypes()).build();      // 将请求内容诸位响应的类型      JsonFormat.Parser parser = JsonFormat.parser().usingTypeRegistry(typeRegistry);      DynamicMessage.Builder messageBuilder = DynamicMessage.newBuilder(originMethodDescriptor.getInputType());      parser.merge(requestContext, messageBuilder);      DynamicMessage requestMessage = messageBuilder.build();      // 调用, 调用方可以通过originMethodDescriptor.isClientStreaming()和originMethodDescriptor.isServerStreaming() 推断      DynamicMessage response = ClientCalls.blockingUnaryCall(channel, methodDescriptor, callOptions, requestMessage);      // 将响应解析为JSON字符串      JsonFormat.Printer printer = JsonFormat.printer().usingTypeRegistry(typeRegistry).includingDefaultValueFields();      String responseContent = printer.print(response);      System.out.println("响应 responseContent: " + responseContent);    }    //格式,而需要的是package.service/method格式,同时请求和响应类型也需要重新设置为 DynamicMessage,所以需要重新生成 MethodDescriptor    private static MethodDescriptor<DynamicMessage, DynamicMessage> generateMethodDescriptor(Descriptors.MethodDescriptor originMethodDescriptor) {      // 生成方法全名      String fullMethodName = MethodDescriptor.generateFullMethodName(originMethodDescriptor.getService().getFullName(), originMethodDescriptor.getName());      // 请求和响应类型      MethodDescriptor.Marshaller<DynamicMessage> inputTypeMarshaller = ProtoUtils.marshaller(DynamicMessage.newBuilder(originMethodDescriptor.getInputType())                .buildPartial());      MethodDescriptor.Marshaller<DynamicMessage> outputTypeMarshaller = ProtoUtils.marshaller(DynamicMessage.newBuilder(originMethodDescriptor.getOutputType())                .buildPartial());      // 生成方法描述, originMethodDescriptor 的 fullMethodName 不正确      return MethodDescriptor.<DynamicMessage, DynamicMessage>newBuilder()                .setFullMethodName(fullMethodName)                .setRequestMarshaller(inputTypeMarshaller)                .setResponseMarshaller(outputTypeMarshaller)                // 使用 UNKNOWN,自动修改                .setType(MethodDescriptor.MethodType.UNKNOWN)                .build();    }}Server端 服务实现

package com.haust.grpc.server;import io.grpc.Server;import io.grpc.netty.NettyServerBuilder;import io.grpc.protobuf.services.ProtoReflectionService;import java.util.concurrent.TimeUnit;public class ReflectionServer {    public static void main(String[] args) {      Server server = NettyServerBuilder.forPort(9921).addService(new HelloGrpcImpl())                .addService(ProtoReflectionService.newInstance()).build();      try {            server.start();            Runtime.getRuntime().addShutdownHook(new Thread(() -> {                try {                  server.awaitTermination(10, TimeUnit.SECONDS);                } catch (InterruptedException e) {                  e.printStackTrace();                }            }));            server.awaitTermination();      } catch (Exception e) {            e.printStackTrace();      }    }}当然一般情况下,在公司的grpc服务中,是不可以开启反射的,此情况仅适用于开发环境
页: [1]
查看完整版本: gRpc 客户端调用服务接口