|
gRPC 客户端调用服务接口
不依赖protobuf生成的代码文件进行调用,可以通过反射接口进行调用。但需要Server端提供io.grpc.reflection.v1alpha.ServerReflectionGrpc 服务,用于获取服务的描述文件。
大致流程:
根据方法名称,调用服务端反射服务的方法,获取方法所在proto文件根据proto描述文件,获取文件描述、服务描述,用于重新构建被调用方法的方法描述MethodDescriptor根据方法描述,将请求内容序列化为对应的类型使用重新构建的MethodDescriptor和其他参数对Server端相应的方法发起调用解析响应并返回
实现
1. proto文件定义 hello.proto
syntax = "proto3";option java_multiple_files = true;option java_package = "com.haust.hello";option java_outer_classname = "HelloProto";package com.haust.hello;service HelloService { rpc SayHello (HelloRequest) returns (HelloReply) {}}message HelloRequest { string message = 1;}message HelloReply { string message = 1;}2. 构建反射服务
package com.haust.grpc.myrpc;import com.google.protobuf.*;import com.google.protobuf.util.JsonFormat;import io.grpc.CallOptions;import io.grpc.ManagedChannel;import io.grpc.ManagedChannelBuilder;import io.grpc.MethodDescriptor;import io.grpc.protobuf.ProtoUtils;import io.grpc.reflection.v1alpha.ServerReflectionGrpc;import io.grpc.reflection.v1alpha.ServerReflectionRequest;import io.grpc.reflection.v1alpha.ServerReflectionResponse;import io.grpc.stub.ClientCalls;import io.grpc.stub.StreamObserver;import java.util.List;import java.util.Map;import java.util.Objects;import java.util.concurrent.TimeUnit;import java.util.stream.Collectors;public class MyGrpc { public static void main(String[] args) throws Exception { // 请求方法 String methodSymbol = "com.haust.hello.HelloService.SayHello"; // 请求内容 String requestContext = "{\"message\":\"this is request\"}"; // 构建channel final ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", 9921).usePlaintext().build(); // 使用channel 构建Stub 创建一个新的异步的stub ServerReflectionGrpc.ServerReflectionStub reflectionStub = ServerReflectionGrpc.newStub(channel); // 响应观察器 // 需要Server端 加入ProtoReflectionService.newInstance() StreamObserver<ServerReflectionResponse> streamObserver = new StreamObserver<ServerReflectionResponse>() { @Override public void onNext(ServerReflectionResponse serverReflectionResponse) { // 处理响应 try { if (serverReflectionResponse.getMessageResponseCase() == ServerReflectionResponse.MessageResponseCase.FILE_DESCRIPTOR_RESPONSE) { List<ByteString> fileDescriptorProtoList = serverReflectionResponse.getFileDescriptorResponse().getFileDescriptorProtoList(); // requestContent 请求内容 handleResponse(fileDescriptorProtoList, channel, methodSymbol, requestContext); } else { System.out.println("响应处理失败" + serverReflectionResponse.getMessageResponseCase()); } } catch (Exception e) { e.printStackTrace(); } } @Override public void onError(Throwable throwable) { throwable.printStackTrace(); } @Override public void onCompleted() { System.out.println("completed!"); } }; // 请求观察器 StreamObserver<ServerReflectionRequest> requestStreamObserver = reflectionStub.serverReflectionInfo(streamObserver); // 根据方法名称获取文件描述请求 ServerReflectionRequest getFileContainingSymbolRequest = ServerReflectionRequest.newBuilder().setFileContainingSymbol(methodSymbol).build(); requestStreamObserver.onNext(getFileContainingSymbolRequest); channel.awaitTermination(10, TimeUnit.SECONDS); } // 在处理请求时,先解析了包名、服务名和方法名,然后根据包名和服务名,从返回的文件描述中获取到了响应方法所在文件的描述;然后从文件描述中获取服务描述,最终获取到方法描述,根据方法描述执行调用 private static void handleResponse(List<ByteString> fileDescriptorProtoList, ManagedChannel channel, String methodFullName, String requestContent) { try { // 解析方法和服务名称 String fullName = extraPrefix(methodFullName); String methodName = extraSuffix(methodFullName); String packageName = extraPrefix(fullName); String serviceName = extraSuffix(fullName); // 根据响应解析 FileDescriptor Descriptors.FileDescriptor fileDescriptor = getFileDescriptor(fileDescriptorProtoList, packageName, serviceName); // 查找服务描述 Descriptors.ServiceDescriptor serviceDescriptor = fileDescriptor.getFile().findServiceByName(serviceName); // 查找方法描述 Descriptors.MethodDescriptor methodDescriptor = serviceDescriptor.findMethodByName(methodName); // 发起请求 executeCall(channel, fileDescriptor, methodDescriptor, requestContent); } catch (Exception e) { e.printStackTrace(); } } // 根据响应找到方法对应的文件的FileDescriptorProto ,然后构建出对应的 FileDescriptor private static Descriptors.FileDescriptor getFileDescriptor(List<ByteString> fileDescriptorProtoList, String packageName, String serviceName) throws Exception { Map<String, DescriptorProtos.FileDescriptorProto> fileDescriptorProtoMap = fileDescriptorProtoList.stream() .map(bs -> { try { return DescriptorProtos.FileDescriptorProto.parseFrom(bs); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } return null; }) .filter(Objects::nonNull) .collect(Collectors.toMap(DescriptorProtos.FileDescriptorProto::getName, f -> f)); if (fileDescriptorProtoMap.isEmpty()) { System.out.println("服务不存在"); throw new IllegalArgumentException("方法的文件描述不存在"); } // 查找服务对应的 Proto 描述 DescriptorProtos.FileDescriptorProto fileDescriptorProto = findServiceFileDescriptorProto(packageName, serviceName, fileDescriptorProtoMap); // 获取这个 Proto 的依赖 Descriptors.FileDescriptor[] dependencies = getDependencies(fileDescriptorProto, fileDescriptorProtoMap); // 生成 Proto 的 FileDescriptor return Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, dependencies); } // 根据包名查找响应的文件描述 private static DescriptorProtos.FileDescriptorProto findServiceFileDescriptorProto(String packageName, String serviceName, Map<String, DescriptorProtos.FileDescriptorProto> fileDescriptorProtoMap) { for (DescriptorProtos.FileDescriptorProto proto : fileDescriptorProtoMap.values()) { if (proto.getPackage().equals(packageName)) { boolean exist = proto.getServiceList().stream().anyMatch(s -> serviceName.equals(s.getName())); if (exist) { return proto; } } } throw new IllegalArgumentException("服务不存在"); } // 获取依赖类型 private static Descriptors.FileDescriptor[] getDependencies(DescriptorProtos.FileDescriptorProto proto, Map<String, DescriptorProtos.FileDescriptorProto> fileDescriptorProtoMap) { return proto.getDependencyList().stream().map(fileDescriptorProtoMap::get) .map(f -> toFileDescriptor(f, getDependencies(f, fileDescriptorProtoMap))).toArray(Descriptors.FileDescriptor[]::new); } // 将FileDescriptorProto转为Descriptor private static Descriptors.FileDescriptor toFileDescriptor(DescriptorProtos.FileDescriptorProto fileDescriptorProto, Descriptors.FileDescriptor[] dependencies) { try { return Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, dependencies); } catch (Descriptors.DescriptorValidationException e) { e.printStackTrace(); } return null; } // 获取前缀 private static String extraPrefix(String context) { int index = context.lastIndexOf("."); return context.substring(0, index); } // 获取后缀 private static String extraSuffix(String context) { int index = context.lastIndexOf("."); return context.substring(index + 1); } // 执行方法调用 private static void executeCall(ManagedChannel channel, Descriptors.FileDescriptor fileDescriptor, Descriptors.MethodDescriptor originMethodDescriptor, String requestContext) throws Exception { // 重新生成MethodDescriptor MethodDescriptor<DynamicMessage, DynamicMessage> methodDescriptor = generateMethodDescriptor(originMethodDescriptor); CallOptions callOptions = CallOptions.DEFAULT; TypeRegistry typeRegistry = TypeRegistry.newBuilder().add(fileDescriptor.getMessageTypes()).build(); // 将请求内容诸位响应的类型 JsonFormat.Parser parser = JsonFormat.parser().usingTypeRegistry(typeRegistry); DynamicMessage.Builder messageBuilder = DynamicMessage.newBuilder(originMethodDescriptor.getInputType()); parser.merge(requestContext, messageBuilder); DynamicMessage requestMessage = messageBuilder.build(); // 调用, 调用方可以通过originMethodDescriptor.isClientStreaming()和originMethodDescriptor.isServerStreaming() 推断 DynamicMessage response = ClientCalls.blockingUnaryCall(channel, methodDescriptor, callOptions, requestMessage); // 将响应解析为JSON字符串 JsonFormat.Printer printer = JsonFormat.printer().usingTypeRegistry(typeRegistry).includingDefaultValueFields(); String responseContent = printer.print(response); System.out.println("响应 responseContent: " + responseContent); } //格式,而需要的是package.service/method格式,同时请求和响应类型也需要重新设置为 DynamicMessage,所以需要重新生成 MethodDescriptor private static MethodDescriptor<DynamicMessage, DynamicMessage> generateMethodDescriptor(Descriptors.MethodDescriptor originMethodDescriptor) { // 生成方法全名 String fullMethodName = MethodDescriptor.generateFullMethodName(originMethodDescriptor.getService().getFullName(), originMethodDescriptor.getName()); // 请求和响应类型 MethodDescriptor.Marshaller<DynamicMessage> inputTypeMarshaller = ProtoUtils.marshaller(DynamicMessage.newBuilder(originMethodDescriptor.getInputType()) .buildPartial()); MethodDescriptor.Marshaller<DynamicMessage> outputTypeMarshaller = ProtoUtils.marshaller(DynamicMessage.newBuilder(originMethodDescriptor.getOutputType()) .buildPartial()); // 生成方法描述, originMethodDescriptor 的 fullMethodName 不正确 return MethodDescriptor.<DynamicMessage, DynamicMessage>newBuilder() .setFullMethodName(fullMethodName) .setRequestMarshaller(inputTypeMarshaller) .setResponseMarshaller(outputTypeMarshaller) // 使用 UNKNOWN,自动修改 .setType(MethodDescriptor.MethodType.UNKNOWN) .build(); }}Server端 服务实现
package com.haust.grpc.server;import io.grpc.Server;import io.grpc.netty.NettyServerBuilder;import io.grpc.protobuf.services.ProtoReflectionService;import java.util.concurrent.TimeUnit;public class ReflectionServer { public static void main(String[] args) { Server server = NettyServerBuilder.forPort(9921).addService(new HelloGrpcImpl()) .addService(ProtoReflectionService.newInstance()).build(); try { server.start(); Runtime.getRuntime().addShutdownHook(new Thread(() -> { try { server.awaitTermination(10, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } })); server.awaitTermination(); } catch (Exception e) { e.printStackTrace(); } }}当然一般情况下,在公司的grpc服务中,是不可以开启反射的,此情况仅适用于开发环境 |
|