# SpringBoot_GRPC **Repository Path**: gumengyu/SpringBoot_GRPC ## Basic Information - **Project Name**: SpringBoot_GRPC - **Description**: Grpc+SpringBoot - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2022-01-04 - **Last Updated**: 2022-01-04 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 1. 基本的RPC模型 主要介绍RPC是什么,基本的RPC代码,RPC与REST的区别,gRPC的使用 ## 1.1 基本概念 - RPC(Remote Procedure Call)远程过程调用,简单的理解是一个节点请求另一个节点提供的服务 - 本地过程调用:如果需要将本地student对象的age+1,可以实现一个addAge()方法,将student对象传入,对年龄进行更新之后返回即可,本地方法调用的函数体通过函数指针来指定。 - 远程过程调用:上述操作的过程中,如果addAge()这个方法在服务端,执行函数的函数体在远程机器上,如何告诉机器需要调用这个方法呢? 1. 首先客户端需要告诉服务器,需要调用的函数,这里函数和进程ID存在一个映射,客户端远程调用时,需要查一下函数,找到对应的ID,然后执行函数的代码。 2. 客户端需要把本地参数传给远程函数,本地调用的过程中,直接压栈即可,但是在远程调用过程中不再同一个内存里,无法直接传递函数的参数,因此需要客户端把参数转换成字节流,传给服务端,然后服务端将字节流转换成自身能读取的格式,是一个序列化和反序列化的过程。 3.数据准备好了之后,如何进行传输?网络传输层需要把调用的ID和序列化后的参数传给服务端,然后把计算好的结果序列化传给客户端,因此TCP层即可完成上述过程,gRPC中采用的是HTTP2协议。 总结一下上述过程: ``` // Client端 // Student student = Call(ServerAddr, addAge, student) 1. 将这个调用映射为Call ID。 2. 将Call ID,student(params)序列化,以二进制形式打包 3. 把2中得到的数据包发送给ServerAddr,这需要使用网络传输层 4. 等待服务器返回结果 5. 如果服务器调用成功,那么就将结果反序列化,并赋给student,年龄更新 // Server端 1. 在本地维护一个Call ID到函数指针的映射call_id_map,可以用Map callIdMap 2. 等待服务端请求 3. 得到一个请求后,将其数据包反序列化,得到Call ID 4. 通过在callIdMap中查找,得到相应的函数指针 5. 将student(params)反序列化后,在本地调用addAge()函数,得到结果 6. 将student结果序列化后通过网络返回给Client ``` ![](./image/rpcframework.png) - 在微服务的设计中,一个服务A如果访问另一个Module下的服务B,可以采用HTTP REST传输数据,并在两个服务之间进行序列化和反序列化操作,服务B把执行结果返回过来。 ![](./image/service.png) - 由于HTTP在应用层中完成,整个通信的代价较高,远程过程调用中直接基于TCP进行远程调用,数据传输在传输层TCP层完成,更适合对效率要求比较高的场景,RPC主要依赖于客户端和服务端之间建立Socket链接进行,底层实现比REST更复杂。 ## 1.2 rpc demo ![系统类图](./image/class.png) ![系统调用过程](./image/sequence.png) 客户端: ```java public class RPCClient { public static T getRemoteProxyObj(final Class serviceInterface, final InetSocketAddress addr) { // 1.将本地的接口调用转换成JDK的动态代理,在动态代理中实现接口的远程调用 return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class[]{serviceInterface}, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Socket socket = null; ObjectOutputStream output = null; ObjectInputStream input = null; try{ // 2.创建Socket客户端,根据指定地址连接远程服务提供者 socket = new Socket(); socket.connect(addr); // 3.将远程服务调用所需的接口类、方法名、参数列表等编码后发送给服务提供者 output = new ObjectOutputStream(socket.getOutputStream()); output.writeUTF(serviceInterface.getName()); output.writeUTF(method.getName()); output.writeObject(method.getParameterTypes()); output.writeObject(args); // 4.同步阻塞等待服务器返回应答,获取应答后返回 input = new ObjectInputStream(socket.getInputStream()); return input.readObject(); }finally { if (socket != null){ socket.close(); } if (output != null){ output.close(); } if (input != null){ input.close(); } } } }); } } ``` 服务端: ```java public class ServiceCenter implements Server { private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); private static final HashMap serviceRegistry = new HashMap(); private static boolean isRunning = false; private static int port; public ServiceCenter(int port){ ServiceCenter.port = port; } @Override public void start() throws IOException { ServerSocket server = new ServerSocket(); server.bind(new InetSocketAddress(port)); System.out.println("Server Start ....."); try{ while(true){ executor.execute(new ServiceTask(server.accept())); } }finally { server.close(); } } @Override public void register(Class serviceInterface, Class impl) { serviceRegistry.put(serviceInterface.getName(), impl); } @Override public boolean isRunning() { return isRunning; } @Override public int getPort() { return port; } @Override public void stop() { isRunning = false; executor.shutdown(); } private static class ServiceTask implements Runnable { Socket client = null; public ServiceTask(Socket client) { this.client = client; } @Override public void run() { ObjectInputStream input = null; ObjectOutputStream output = null; try{ input = new ObjectInputStream(client.getInputStream()); String serviceName = input.readUTF(); String methodName = input.readUTF(); Class[] parameterTypes = (Class[]) input.readObject(); Object[] arguments = (Object[]) input.readObject(); Class serviceClass = serviceRegistry.get(serviceName); if(serviceClass == null){ throw new ClassNotFoundException(serviceName + "not found!"); } Method method = serviceClass.getMethod(methodName, parameterTypes); Object result = method.invoke(serviceClass.newInstance(), arguments); output = new ObjectOutputStream(client.getOutputStream()); output.writeObject(result); }catch (Exception e){ e.printStackTrace(); }finally { if(output!=null){ try{ output.close(); }catch (IOException e){ e.printStackTrace(); } } if (input != null) { try { input.close(); } catch (IOException e) { e.printStackTrace(); } } if (client != null) { try { client.close(); } catch (IOException e) { e.printStackTrace(); } } } } } } ``` ```java public class ServiceProducerImpl implements ServiceProducer{ @Override public String sendData(String data) { return "I am service producer!!!, the data is "+ data; } } ``` ```java public class RPCTest { public static void main(String[] args) throws IOException { new Thread(new Runnable() { @Override public void run() { try { Server serviceServer = new ServiceCenter(8088); serviceServer.register(ServiceProducer.class, ServiceProducerImpl.class); serviceServer.start(); } catch (IOException e) { e.printStackTrace(); } } }).start(); ServiceProducer service = RPCClient.getRemoteProxyObj(ServiceProducer.class, new InetSocketAddress("localhost", 8088)); System.out.println(service.sendData("test")); } } ``` ## 1.3 完整源码 [RPCdemo](https://github.com/guangxush/wheel/tree/master/RPC/src) ## 1.4 分析 这里客户端只需要知道Server端的接口ServiceProducer即可,服务端在执行的时候,会根据具体实例调用实际的方法ServiceProducerImpl,符合面向对象过程中父类引用指向子类对象。 # 2. gRPC的使用 ## 2.1. gRPC与REST - REST通常以业务为导向,将业务对象上执行的操作映射到HTTP动词,格式非常简单,可以使用浏览器进行扩展和传输,通过JSON数据完成客户端和服务端之间的消息通信,直接支持请求/响应方式的通信。不需要中间的代理,简化了系统的架构,不同系统之间只需要对JSON进行解析和序列化即可完成数据的传递。 - 但是REST也存在一些弊端,比如只支持请求/响应这种单一的通信方式,对象和字符串之间的序列化操作也会影响消息传递速度,客户端需要通过服务发现的方式,知道服务实例的位置,在单个请求获取多个资源时存在着挑战,而且有时候很难将所有的动作都映射到HTTP动词。 - 正是因为REST面临一些问题,因此可以采用gRPC作为一种替代方案,gRPC 是一种基于二进制流的消息协议,可以采用基于Protocol Buffer的IDL定义grpc API,这是Google公司用于序列化结构化数据提供的一套语言中立的序列化机制,客户端和服务端使用HTTP/2以Protocol Buffer格式交换二进制消息。 - gRPC的优势是,设计复杂更新操作的API非常简单,具有高效紧凑的进程通信机制,在交换大量消息时效率高,远程过程调用和消息传递时可以采用双向的流式消息方式,同时客户端和服务端支持多种语言编写,互操作性强;不过gRPC的缺点是不方便与JavaScript集成,某些防火墙不支持该协议。 - 注册中心:当项目中有很多服务时,可以把所有的服务在启动的时候注册到一个注册中心里面,用于维护服务和服务器之间的列表,当注册中心接收到客户端请求时,去找到该服务是否远程可以调用,如果可以调用需要提供服务地址返回给客户端,客户端根据返回的地址和端口,去调用远程服务端的方法,执行完成之后将结果返回给客户端。这样在服务端加新功能的时候,客户端不需要直接感知服务端的方法,服务端将更新之后的结果在注册中心注册即可,而且当修改了服务端某些方法的时候,或者服务降级服务多机部署想实现负载均衡的时候,我们只需要更新注册中心的服务群即可。 ![RPC调用过程](./image/recall.png) ## 2.2. gRPC与Spring Boot 这里使用SpringBoot+gRPC的形式实现RPC调用过程 项目结构分为三部分:client、grpc、server ![项目结构](./image/structure.png) ### 2.2.2 grpc ![](./image/grpc.png) pom.xml中引入依赖: ``` io.grpc grpc-all 1.12.0 ``` 引入bulid ``` kr.motd.maven os-maven-plugin 1.4.1.Final org.xolstice.maven.plugins protobuf-maven-plugin 0.5.0 grpc-java com.google.protobuf:protoc:3.0.2:exe:${os.detected.classifier} io.grpc:protoc-gen-grpc-java:1.2.0:exe:${os.detected.classifier} compile compile-custom ``` 创建.proto文件 ```proto syntax = "proto3"; // 语法版本 // stub选项 option java_package = "com.shgx.grpc.api"; option java_outer_classname = "RPCDateServiceApi"; option java_multiple_files = true; // 定义包名 package com.shgx.grpc.api; // 服务接口定义,服务端和客户端都要遵守该接口进行通信 service RPCDateService { rpc getDate (RPCDateRequest) returns (RPCDateResponse) {} } // 定义消息(请求) message RPCDateRequest { string userName = 1; } // 定义消息(响应) message RPCDateResponse { string serverDate = 1; } ``` mvn complie ![](./image/compile.png) 生成代码: ![](./image/target.png) ### 2.2.3 client ![](./image/client.png) 根据gRPC中的项目配置在client和server两个Module的pom.xml添加依赖 ![](./image/pom.png) ``` com.shgx grpc 0.0.1-SNAPSHOT compile ``` 编写GRPCClient ```java public class GRPCClient { private static final String host = "localhost"; private static final int serverPort = 9999; public static void main( String[] args ) throws Exception { ManagedChannel managedChannel = ManagedChannelBuilder.forAddress( host, serverPort ).usePlaintext().build(); try { RPCDateServiceGrpc.RPCDateServiceBlockingStub rpcDateService = RPCDateServiceGrpc.newBlockingStub( managedChannel ); RPCDateRequest rpcDateRequest = RPCDateRequest .newBuilder() .setUserName("shgx") .build(); RPCDateResponse rpcDateResponse = rpcDateService.getDate( rpcDateRequest ); System.out.println( rpcDateResponse.getServerDate() ); } finally { managedChannel.shutdown(); } } } ``` ### 2.2.4 server ![](./image/server.png) 按照2.2.3 client的方式添加依赖 创建RPCDateServiceImpl ```java public class RPCDateServiceImpl extends RPCDateServiceGrpc.RPCDateServiceImplBase{ @Override public void getDate(RPCDateRequest request, StreamObserver responseObserver) { RPCDateResponse rpcDateResponse = null; Date now=new Date(); SimpleDateFormat simpleDateFormat = new SimpleDateFormat("今天是"+"yyyy年MM月dd日 E kk点mm分"); String nowTime = simpleDateFormat.format( now ); try { rpcDateResponse = RPCDateResponse .newBuilder() .setServerDate( "Welcome " + request.getUserName() + ", " + nowTime ) .build(); } catch (Exception e) { responseObserver.onError(e); } finally { responseObserver.onNext( rpcDateResponse ); } responseObserver.onCompleted(); } } ``` 创建GRPCServer ```java public class GRPCServer { private static final int port = 9999; public static void main( String[] args ) throws Exception { Server server = ServerBuilder. forPort(port) .addService( new RPCDateServiceImpl() ) .build().start(); System.out.println( "grpc服务端启动成功, 端口=" + port ); server.awaitTermination(); } } ``` ## 2.3. 完整源码 [源码参考](https://github.com/guangxush/SpringBoot_GRPC) # 3. 参考文章 - [gRPC - Spring Boot 示例](https://www.jianshu.com/p/80f7929199fd) - [RPC框架解释](https://www.zhihu.com/question/25536695) - [什么是RPC](https://www.jianshu.com/p/eb66b0c4113d)