# mprpc **Repository Path**: win1010/mprpc ## Basic Information - **Project Name**: mprpc - **Description**: 基于muduo+protobuf的分布式网络通信框架项目 - **Primary Language**: C++ - **License**: MulanPSL-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2024-06-16 - **Last Updated**: 2024-07-27 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 1. 项目介绍 该项目是在 Linux 环境下基于 muduo、Protobuf 和 Zookeeper 实现的一个轻量级 RPC 框架。可以把单体架构系统的本地方法调用,重构成基于 TCP 网络通信的 RPC 远程方法调用,实现同一台机器不同进程或者不同机器之间的服务调用。 ## 1.1 项目特点 - 基于 muduo 网络库实现高并发网络通信模块,作为 RPC 远程调用的基础。 - 基于 Protobuf 实现 RPC 方法调用和参数的序列化和反序列化,并根据其提供得 RPC 接口编写 RPC 服务。 - 基于 ZooKeeper 分布式协调服务中间件提供服务注册和服务发现功能。 - 基于生产者消费者模型,实现了异步工作方式的日志模块。 ## 1.2 项目演示 启动 Zookeeper,我们需要再 Zookeeper 上获取注册的服务消息,因此先需要保证启动了 Zookeeper。进入到 Zookeeper 的目录,我的 Zookeeper 目录是在`/home/zwg/xiazai/zookeeper-3.4.10`。在里面的`bin`目录下有客户端和服务端的启动脚本,先启动 Zookeeper 服务端,再启动客户端。启动客户端主要是为了看我们的 RPC 框架是否插入新服务信息。 ```bash ./zkServer.sh start #启动 Zookeeper 服务端 ./zkCli.sh #启动 Zookeeper 客户端 ``` ![image](IMG/zkserver.png) 启动服务端 ```bash ./provider -i mprpc.conf ``` 我们观察下打印的信息,可以看到它打印了 `mprpc.conf` 配置文件的信息,至少说明 RPC 框架读取配置是成功的。 ![image](IMG/provider.png) 重点是下面的信息,其显示在 Zookeeper 上注册了服务。 ![image](IMG/zkcli.png) 启动客户端 ```bash ./consumer -i mprpc.conf ``` 服务提供方显示连接成功,rpc方法调用完毕后连接断开。 ![image](IMG/connect.png) 调用者成功获取rpc方法响应。 ![image](IMG/consumer.png) 有很多的提示信息,也是解析了配置文件,并且有许多 Zookeeper 相关日志信息。这里注意最重要的几个地方,它打印显示了 `rpc GetFriendsList response success`。RPC 方法调用成功! # 2. RPC方法调用总体流程 ![image](IMG/liucheng.jpeg) # 3. RPC框架的设计 这个项目是基于 Protobuf 和 Zookeeper 的 RPC 框架,那么 Protobuf 和 Zookeeper 又在整个框架中扮演怎样的角色呢? ## 3.1 Protobuf的作用 Protobuf主要实现RPC方法调用中方法和参数的序列化和反序列化,并定义网络通信中数据的传输格式(header_size(4字节长度信息) + service_name method_name args_size(header,服务名、方法名、参数长度,参数长度用于解决粘包问题) + args(RPC方法调用所需的参数))。我们可以看一下整个框架对于传输信息的格式定义: ```protobuf syntax="proto3"; package mprpc; message MprpcHeader { bytes service_name=1; bytes method_name=2; uint32 args_size=3; } ``` 当RPC框架收到调用方发送的数据时, 可以看到,它**定义了要调用方法是属于哪个类的哪个方法以及这个方法所需要的的参数长度**。 那怎么进行使用呢?我们来看一下我们框架内传输的数据是什么:**4字节标识头部长度header_size + rpc_header_str + args_str。其中,head_size 描述的是 rpc_header_str 的长度,header_str 包括 service_name、method_name 和 args_size,args_size 描述的是 agrs_str 的长度。** 当服务者在网络中接收到框架发送的字节流后,先读取前4个字节,得到 header_size,然后读取 header_size 个长度,得到 rpc_header_str,然后对其反序列化得到 MprpcHeader 对象,然后可以提取出 service_name、method_name 和 args_size,最后再根据 args_size,读到参数 args_str 然后对其进行反序列化并提取参数。这样既得到了对象名、方法名和参数,又很好的解决了粘包问题。 ## 3.2 Zookeeper的作用 Zookeeper 在本项目中主要是起到了一个**配置中心**的作用。 什么意思呢? Zookeeper上面我们标识了每个类的方法所对应的**分布式节点地址**,当我们其他服务器或客户端想要 RPC 的时候,就去 Zookeeper 上面查询对应要调用的服务在哪个节点上。如果该服务在 Zookeeper 上注册过,那么 Zookeeper 将会返回该方法所在的 IP 地址和端口号。 Zookeeper 所起的作用就是**服务注册和服务发现**。 # 4. 从框架的使用来认识 框架的使用一般都是在 example 目录下的 `callee/FriendService.cc` 里面 ```c++ // 调用框架的初始化函数,传入命令行参数 MprpcApplication::Init(argc, argv); // provider是一个rpc网络服务对象。把UserService对象发布到rpc节点上 MprpcProvider provider; provider.NotifyService(new FriendService()); // 启动一个rpc服务节点, Run以后,进程进入阻塞状态,等待远程的rpc调用请求 provider.Run(); ``` 可以看到,主要去做了三个事情: - 首先 RPC 框架肯定是部署到一台服务器上的,所以我们需要对这个服务器的 ip 和 port 对 RPC 框架进行初始化。 - 然后创建一个 porvider(也就是 server)对象,将当前 FriendService 这个对象传递给他,也就是对 FriendService 这个服务的所有的 RPC 方法都保存到 provider 对象中的 Map 表中记录起来。 - 最后就是让这个 provider 去 Run 起来,也就是将 RPC 方法发布到 Zookeeper 上,并启动 RPC 服务。 ```C++ // 启动rpc服务节点,开始提供rpc远程网络调用服务 void MprpcProvider::Run() { std::string ip = MprpcApplication::GetInstance().GetConfig().Load("rpcServerIp"); uint16_t port = atoi(MprpcApplication::GetInstance().GetConfig().Load("rpcServerPort").c_str()); // muduo::net::InetAddress address(ip, port); InetAddress address(ip, port); // muduo的使用 // 1.创建TcpServer对象 // muduo::net::TcpServer server(&m_eventLoop, address, "MprpcProvider"); TcpServer server(&m_eventLoop, address, "MprpcProvider"); // 2.绑定回调函数 server.setConnectionCallback(std::bind(&MprpcProvider::OnConnection, this, std::placeholders::_1)); server.setMessageCallback(std::bind(&MprpcProvider::OnMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); // 3.设置线程数量 server.setThreadNum(4); // 把当前rpc节点要发布的服务注册到zk上,让rpc client可以从zk上发现服务 ZkClient zkclient; zkclient.Satrt(); // service_name节点为永久性节点,method_name为临时性节点 for (auto &sp : m_serviceMap) { // 先创建对象节点,再创建方法节点。 不能一起创建 std::string service_path = "/" + sp.first; // 拼接对象节点路径 zkclient.Create(service_path.c_str(), nullptr, 0); for (auto &mp : sp.second.m_methodMap) { std::string method_path = service_path + "/" + mp.first; // 拼接方法节点路径 char znode_data[128] = {0}; sprintf(znode_data, "%s:%d", ip.c_str(), port); // 这里注意,用的是strlen,只存有效的。 zkclient.Create(method_path.c_str(), znode_data, strlen(znode_data), ZOO_EPHEMERAL); } } // std::cout << "MprpcProvider start service at ip:" << ip << " port:" << port << std::endl; LOG_INFO_("MprpcProvider start service at ip:%s port:%d", ip.c_str(), port); // 4.启动网络服务 server.start(); m_eventLoop.loop(); // 循环执行loop } ``` 可以看到,整个 Run 其实就是干了这么几件事情: - 因为底层调用的是muduo网络库,所以这里会获取ip地址和端口号,然后初始化网络层。 - 然后去设置一个连接回调以及发生读写事件时候的回调函数。 - 然后设置整个 muduo 网络库工作的线程数量。 - 然后创建 Zookeeper 客户端,与 Zookeeper 服务端建立连接,将这些方法的信息以及本机的 IP 地址注册到 Zookeeper。 - 然后开启本机服务器的事件循环,等待其他服务器或客户端的连接。 # 5. 其他服务器或客户端是怎样调用的呢 我们看一下 example 目录下的 caller/CallFriendService.cpp 里面是怎样调用的。 ```C++ int main(int argc, char *argv[]) { // 整个程序启动以后,要想使用mprpc框架,一定要先调用初始化函数(仅调用一次) MprpcApplication::Init(argc, argv); fixbug::FriendServiceRpc_Stub stub(new MprpcChannel()); fixbug::GetFriendsListRequest request; request.set_userid(1999); // rpc方法的响应 fixbug::GetFriendsListResponse response; MprpcController controller; // 这里最后一个参数google::protobuf::Closure *done,设为了nullptr,因为我们没有采用回调发送,而是在CallMethod直接 // 编写了tcp的发送 TO PONDER // 发起rpc方法的调用,实际经过多态,最终执行MprpcChannel::CallMehtod stub.GetFriendsList(&controller, &request, &response, nullptr); if (controller.Failed()) { std::cout << controller.ErrorText() << std::endl; } else { if (0 == response.result().errcode()) { // 调用成功 std::cout << "rpc GetFriendsList response success" << std::endl; int list_size = response.friends_size(); for (int i = 0; i < list_size; i++) { std::cout << "num:" << i + 1 << " --- name:" << response.friends(i) << std::endl; } } else { std::cout << "rpc GetFriendsList response error:" << response.result().errmsg() << std::endl; } } return 0; } ``` 同样,也是有以下几个步骤: - 初始化 RPC 框架 - 定义一个 FriendService 的 stub 桩类,由这个桩类去调用GetFriendsList方法,这个 GetFriendsList方法可以去看一下源码的定义: ```C++ // 处理业务的方法 std::vector GetFriendsList(uint32_t userid) { // std::cout<<"doing local service: GetFriendsList"< vc; vc.push_back("guan yv"); vc.push_back("zhao yun"); vc.push_back("huang zhong"); vc.push_back("ma chao"); return vc; } void GetFriendsList(google::protobuf::RpcController *controller, const ::fixbug::GetFriendsListRequest *request, fixbug::GetFriendsListResponse *response, google::protobuf::Closure *done) { // 1. 从请求中获取数据。request是框架帮我们反序列化出的 uint32_t userid = request->userid(); // 2. 执行本地函数 std::vector friendsList = GetFriendsList(userid); // 3. 把响应填入Response。我们使用框架,只管填写,反序列化由框架完成 fixbug::ResultCode *result = response->mutable_result(); // 获取result字段的地址 result->set_errcode(0); result->set_errmsg(""); for (std::string &name : friendsList) { std::string *pf = response->add_friends(); // 获取最后一个待插入位置的地址 *pf = name; } // 4. 执行回调操作,即响应对象数据response的序列化和网络发送(由框架完成) done->Run(); } ``` 可以看到,GetFriendsList 的 RPC 重载函数有四个参数:**controller(表示函数是否出错)、request(参数)、response(返回值)、done(回调函数)** 其主要做的也是去围绕着解析参数,将参数放入本地调用的方法,将结果返回并执行回调函数。至于这个回调函数则是在服务端执行读写事件回调函数绑定的。 绑定的方法如下: ```C++ void MprpcProvider::senRpcResponse(const TcpConnectionPtr &conn, google::protobuf::Message *response) { std::string response_str; // 存放响应序列化后的字符串 if (response->SerializePartialToString(&response_str)) // 响应序列化 { // conn->send(response_str.c_str(), response_str.size()); // muduo库网络发送 conn->send(response_str.c_str(), response_str.size()); // muduo库网络发送 } else { // std::cout << "Serialize error!" << std::endl; LOG_ERROR_("Serialize error!"); } conn->shutdown(); // 成功失败均关闭连接 } ``` 它是将 RPC 方法调用的结果 response_str 发送给 RPC 调用方。 # 6. 桩类是干嘛的 桩类 FriendServiceRpc_Stub 是 Protobuf 帮我们自动生成的一个类。 ```C++ class FriendServiceRpc_Stub : public FriendServiceRpc { public: FriendServiceRpc_Stub(::PROTOBUF_NAMESPACE_ID::RpcChannel* channel); FriendServiceRpc_Stub(::PROTOBUF_NAMESPACE_ID::RpcChannel* channel, ::PROTOBUF_NAMESPACE_ID::Service::ChannelOwnership ownership); ~FriendServiceRpc_Stub(); inline ::PROTOBUF_NAMESPACE_ID::RpcChannel* channel() { return channel_; } // implements FriendServiceRpc ------------------------------------------ void GetFriendsList(::PROTOBUF_NAMESPACE_ID::RpcController* controller, const ::fixbug::GetFriendsListRequest* request, ::fixbug::GetFriendsListResponse* response, ::google::protobuf::Closure* done); private: ::PROTOBUF_NAMESPACE_ID::RpcChannel* channel_; bool owns_channel_; GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(FriendServiceRpc_Stub); }; ``` 其实 FriendServiceRpc_Stub 是一个代理类,我们需要传入 RpcChannel 指针进行初始化。但由于 RpcChannel 是一个抽象类,所以我们需要实现一个继承抽象类 RpcChannel 的子类 MprpcChannel,重写抽象类的虚函数。将 MprpcChannel 的指针传入给桩类 FriendServiceRpc_Stub,当我们去调用这个桩类的`GetFriendsList`方法的时候,会去调用传递进来的 channel 的`CallMethod`方法: ```C++ void FriendServiceRpc_Stub::GetFriendsList(::PROTOBUF_NAMESPACE_ID::RpcController* controller, const ::fixbug::GetFriendsListRequest* request, ::fixbug::GetFriendsListResponse* response, ::google::protobuf::Closure* done) { channel_->CallMethod(descriptor()->method(0), controller, request, response, done); } ``` 所以,我们的 RPC 请求的发送以及响应的接收都是在 CallMethod 这个方法里面进行的。 ```C++ // 所有通过stub代理对象调用的rpc方法,都构成多态,走到了这里,统一实现rpc方法调用请求的序列化和网络发送 void MprpcChannel::CallMethod(const google::protobuf::MethodDescriptor *method, google::protobuf::RpcController *controller, const ::google::protobuf::Message *request, google::protobuf::Message *response, google::protobuf::Closure *done) { const google::protobuf::ServiceDescriptor *serviceDesc = method->service(); // 框架对 rpc请求的序列化 // header_size + service_name method_name args_size + args std::string service_name = serviceDesc->name(); // 调用的服务对象名 std::string method_name = method->name(); // 调用的函数方法名 std::string rpc_header_str; uint32_t header_size; uint32_t args_size = 0; std::string args_str; if (request->SerializePartialToString(&args_str)) // 序列化出调用的方法参数 { args_size = args_str.size(); } else { controller->SetFailed("request serialize error!"); return; } mprpc::MprpcHeader rpcHeader; rpcHeader.set_service_name(service_name); rpcHeader.set_method_name(method_name); rpcHeader.set_args_size(args_size); if (rpcHeader.SerializePartialToString(&rpc_header_str)) { header_size = rpc_header_str.size(); } else { controller->SetFailed("rpcHeader serialize error!"); return; } // 组合成符合规则结构的要发送的字符串 std::string send_str; send_str.insert(0, std::string((char *)&header_size, 4)); // TO PONDER send_str += rpc_header_str; send_str += args_str; // 打印调试信息 // std::cout << "======================================================" << std::endl; // std::cout << "send_str: " << send_str << std::endl; // std::cout << "header_size: " << header_size << std::endl; // std::cout << "rpc_header_str: " << rpc_header_str << std::endl; // std::cout << "service_name: " << service_name << std::endl; // std::cout << "method_name: " << method_name << std::endl; // std::cout << "args_size: " << args_size << std::endl; // std::cout << "args_str: " << args_str << std::endl; // std::cout << "======================================================" << std::endl; int clientfd = socket(AF_INET, SOCK_STREAM, 0); if (clientfd == -1) { char err_buf[512] = {0}; snprintf(err_buf, 512, "clientfd create error, errno:%d", errno); controller->SetFailed(err_buf); return; } // // 获取单例对象,得到配置文件中的地址 // std::string ip = MprpcApplication::GetInstance().GetConfig().Load("rpcServerIp"); // uint16_t port = atoi(MprpcApplication::GetInstance().GetConfig().Load("rpcServerPort").c_str()); // 使用zookeeper ZkClient zkclient; zkclient.Satrt(); // 建立会话 // 组装节点路径 std::string znode_path = "/" + service_name + "/" + method_name; // 读取节点数据 ip:port std::string znode_data = zkclient.GetData(znode_path.c_str()); int div = znode_data.find(":"); std::string ip = znode_data.substr(0, div); uint16_t port = atoi(znode_data.substr(div + 1).c_str()); struct sockaddr_in server_addr; bzero(&server_addr, sizeof server_addr); server_addr.sin_family = AF_INET; server_addr.sin_port = htons(port); server_addr.sin_addr.s_addr = inet_addr(ip.c_str()); // 连接rpc服务节点 int n = connect(clientfd, (struct sockaddr *)&server_addr, sizeof server_addr); if (n == -1) { close(clientfd); char err_buf[512] = {0}; snprintf(err_buf, 512, "connect error, errno::%d", errno); controller->SetFailed(err_buf); return; } // 发送给rpc服务节点 n = send(clientfd, send_str.c_str(), send_str.size(), 0); if (n == -1) { close(clientfd); char err_buf[512] = {0}; snprintf(err_buf, 512, "send error, errno::%d", errno); controller->SetFailed(err_buf); return; } // 接收rpc请求的响应 char recv_buf[RECVBUFFSIZE] = {0}; n = recv(clientfd, recv_buf, RECVBUFFSIZE - 1, 0); if (n == -1) { close(clientfd); char err_buf[512] = {0}; snprintf(err_buf, 512, "recv error, errno::%d", errno); controller->SetFailed(err_buf); return; } // rpc响应的反序列化 // recv_buf不一定满 // std::string response_str(recv_buf, 0, n); // if (!response->ParseFromString(response_str)) if (!response->ParseFromArray(recv_buf, n)) { close(clientfd); char err_buf[512] = {0}; snprintf(err_buf, 512, "response parse error, response_str:%s ", recv_buf); controller->SetFailed(err_buf); return; } close(clientfd); } ``` - 组织要发送的 sendRpcStr 字符串。 - 从 Zookeeper 中拿到服务端的 ip 和 port,连接服务端。 - 发送 sendRpcStr 字符串。 - 接收服务端返回过来的 response 对象并进行反序列化得到相应。