# rpc **Repository Path**: x-yunfei/rpc ## Basic Information - **Project Name**: rpc - **Description**: 手搓 rpc 框架 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: main - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2026-02-10 - **Last Updated**: 2026-03-01 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # Rpc项目总结 1. 基于 Netty 实现底层网络通信; 2. 自定义通信协议,在消息中包含消息长度、魔法数、消息类型、版本号、序列化器和压缩器等信息; 3. Provider 端实现服务注册表,管理服务实例(实现),实现动态调用; 4. Consumer 端实现连接管理器,实现长连接,避免重复创建 channel; 5. 使用动态代理实现 Consumer 创建工厂,动态创建接口的代理对象,实现远程调用; 6. 基于 Zookeeper 实现注册中心,将服务的元数据信息存入注册中心,在Consumer 端拉取服务的元数据 list 实现负载均衡; 7. rpc 调用异常时封装请求上下信息,进行服务重试,实现forking、failover 等重试策略; 8. 为每个 channel 维护一个限流器,通过获取凭证和归还凭证操作实现限流功能,实现并发限流与速率限流; 9. 为每个 provider 维护一个熔断器,记录 rpc 调用过程中的信息作为是否熔断的参照标; 10. 使用组合模式实现默认降级策略,组合缓存降级策略和 mock 降级策略; 11. 自定义编码器和解码器 Handler,实现消息序列化/反序列化和压缩/解压缩操作; 12. 使用 Netty 的 IdleStateHandler 触发空闲事件,配合自定义的 HeartbeatHandler 实现心跳检测; 13. 在 Provider 端创建业务线程池执行具体业务,降低 woker 线程压力,提升系统吞吐量; 14. 使用 JDK 的 SPI 机制实现动态组件加载,让使用者可以传入自定义的重试策略/负载均衡/序列化/压缩等实现; 15. 实现泛化调用,让客户端无需引入完整包体,仅传递字符串等常用类型信息也能完成调用; ## 具体实现 1. 基于 Netty 实现网络通信(没什么好说的) 2. 消息格式包括:length、magic、messageType、version、serializeAndCompress、body,在自定义的Encoder(出站处理器)中重写 encode 方法,将上述信息写出到 **ByteBuf**中,在自定义的Decoder(入站处理器)中重写decode 方法,从**ByteBuf**中依次读取出上述信息; 3. 实现了一个ProviderRegistry 类,里面维护了一个serviceInstanceMap,**key 为serviceName(接口的全类名),value为服务实现的封装对象**,向外提供 register(Class interfaceClass, I serviceInstance) 接口注册服务,findService(String serviceName)接口获取服务实例,基于服务实例执行invoke(String methodName,Class[] paramsClass,Object[] params)方法就会利用反射执行具体实现,返回执行结果; 4. 实现了一个ConnectionManager 类,以维护一个channelTable, 以host:port 为 key,channel 为 value, 避免客户端连接同一 host:port 时重复创建 channel, 重复建立连接,向外提供根据 host和 port 获取 channel 的方法; 5. 实现了一个ConsumerProxyFactory 工厂类,提供createConsumerProxy 方法,传入接口的 class,利用 JDK 的动态代理机制创建代理对象,让代理对象在调用方法执行rpc 调用并获取返回结果。具体的,在调用过程中会封装一个**Request 对象**,里面有**要调用的服务名称、具体要调用的方法名称、传递的参数类型、传递的参数、requestId**等信息,代理对象会将这个Request 对象发生给服务端,服务端拿到这个Request 后会根据里面的信息从注册表中拿取具体的服务实现,并基于反射执行具体方法,写回方法的结果; 6. 定义了一个ServiceRegistry(注册中心)接口,里面定义了初始化、服务注册、服务发现接口,**服务注册**即传递一个**ServiceMetadata**对象,里面记录了服务的元数据信息,包括 host、port 以及serviceName,**服务发现**即根据serviceName返回一个**ServiceMetadata**列表,发起 rpc 调用时,consumer 端就可以基于这个实现负载均衡; 7. 执行rpc 调用时,会记录一个**在途请求**对象`CompletableFuture requestFuture`,调用成功后就会把返回结果 set 到 requestFuture 中,出现异常也会把异常写入到 requestFuture,从requestFuture 中 get拿返回结果,如果抛了异常就说明调用失败,然后就进行重试。重试会封装一个重试上下文对象RetryContext,里面记录了上次请求失败的服务信息、所有的服务信息、该方法所能容忍的剩余请求超时时间、负载均衡策略、重试发送请求的实现等信息,具体的重试策略可以根据这些信息进行重试,重试策略包括:对同一个 provider 进行重试、失败后向其它的 provider 进行重试 (故障转移)、失败后向其它的所有的 provider进行重试,只要有一个拿到结果就返回; 8. 限流器提供tryAcquire 方法尝试获取凭证,拿不到凭证快速失败,以及release 方法释放(归还)凭证。在 consumer端限流就是在新增在途请求requestFuture 时进行限流,每个channel 都有一个专属的限流器,还有一个全局的限流器;provider端限流就是自定义一个LimiterHandler 进行限流,每个channel 上下文中也有一个专属的限流器,同样也有全局限流器; 9. 熔断器的目的就是判断 请求能否发送至相应的provider中,为此需要对每个 provider 都维护一个熔断器,并记录 rpc 调用过程中的信息作为参照标准,熔断器会提供allowRequest()表示对应的 provider是否允许请求,提供recordRpc()记录rpc 调用过程中的信息,熔断器包含OPEN: provider异常,不允许发送请求、CLOSE: provider正常,允许发送请求,进行熔断后变为OPEN状态、HALF_OPEN: 允许试探性地发送一些请求,若请求正常相应,则变为CLOSE状态这 3 个状态,进行熔断后,会从CLOSE --> OPEN,超过熔断时间后会从OPEN --> HALF_OPEN,尝试性地发请求,成功就变会CLOSE,失败就继续OPEN; 10. rpc 调用失败会进行重试,如果重试也抛异常或者没有 provider 可用时,就会触发降级策略,如CacheFallback 会缓存之前调用的结果,MockFallback 会调用本地的实现方法。这里的DefaultFallback 将这2 个进行组合,先调用cacheFallback,如果失败了再执行mockFallback; 11. 实现了 json 和 hessian 序列化器,none 和 gzip 压缩器,consuemr在发送消息时,会先将消息序列化成字节,再进行压缩,并设置一个字节(serializeAndCompress)用于表示使用的序列化器和压缩器,provider 拿到消息后,会根据这个字节拿到具体的序列化器和压缩器,进行解压与反序列化。同时序列化器和压缩器支持 SPI 动态加载; 12. IdleStateHandler 可以设置读写的空闲事件,Netty 里有处理事件函数userEventTriggered,在自定义的HeartbeatHandler 重写这个函数,处理空闲事件向对端发送心跳请求,实现心跳检测; 13. worker线程处理具体的业务可能会导致1个请求阻塞其他请求,单独 创建业务线程池 invokeExecutor处理具体的业务, 结果再由worker线程写回,保证读写吞吐量; 14. 框架应当允许用户自定义序列化器、压缩器、负载均衡器、重试策略等组件,需要使用 SPI 机制进行动态组件加载,JDK 的 SPI 机制就是在 resorces.META-INF.services 下创建文件,以接口的全类名为文件名,文件里写实现类的全类型,这样 JVM 就会自动创建这些实现类,在代码中用`ServiceLoader.load(Interface.class)`就可以获取这些组件。在本项目中,会为这些组件创建一个管理器,如序列化管理器,在创建时使用`ServiceLoader.load(Serializer.class) `拿到所有的序列化组件,序列化组件会通过重写**Extension ** 接口的getName 和code 方法获取 name和 code,或者通过**@Spi**注解利用反射获取,同时维护 nameMap 和 codeMap 进行存储与管理,向外提供根据 code 或根据 name 获取具体序列化组件的方法。 15. 实现泛化调用,让客户端无需引入完整包体,仅传递字符串等常用类型信息也能完成调用;