# 仿 mudou 库 one thread one loop 并发服务器 **Repository Path**: dinosaur-security/One-Thread-One-Loop ## Basic Information - **Project Name**: 仿 mudou 库 one thread one loop 并发服务器 - **Description**: 仿 mudou 库 one thread one loop 并发服务器项目源码及介绍 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2024-06-06 - **Last Updated**: 2024-08-20 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 项目简介 该项目实现的是**主从 Reactor** 服务器模型,由**主 Reactor 线程**负责对监听描述符进行事件监控,获取新建连接,保证获取新连接的高效性,提高服务器的并发性能。主 Reactor 线程获取到新连接后分发给**从 Reactor 线**程进行通信事件的监控以及完成业务处理。通过这个高并发服务器组件,可以简洁快速的完成一个**高性能服务器**搭建,并且,通过组件内提供的不同应用层协议支持,也可以快速完成一个**高性能应用服务器**的搭建。 # 一、Buffer 模块 ## 1.1 模块简介 **功能**: 用于实现通信套接字的用户态缓冲区。 **意义**: * 防止一次接收到的数据不是一条完整的数据,因此对接收的数据进行缓冲 * 对于向客户端响应的数据,应该是在套接字可写的情况下进行发送 **需要对外提供的接口**: * 向缓冲区中添加数据 * 从缓冲区中取出数据 ## 1.2 设计思想 **需要具备的成员变量**: * **首先实现一个缓冲区得有一块内存空间**,可以自己 `malloc` ,也可以直接使用 `STL` 中的容器,这样就不需要自己去对空间进行维护。我这里使用 `vactor`,这里不建议使用 `string`,因为 `string ` 中的接口多是针对字符串的操作,遇到 `\0` 就终止了,这样做并不好,因为网络传输中可能 0 就是数据有效,我们并不希望遇到 `\0` 就终止。 * **默认的空间大小** * **当前读取数据的位置**。如果不记录当前的读取位置,那就需要每读取一部分数据,就将后面未被读取的数据向前移动,每次都从头开始读取,这样效率是比较低的,挪动字符串的时间复杂度是 `O(N)`。 * **当前写入数据的位置**。 ![image-20240411084218869](https://gitee.com/dinosaur-security/img/raw/master/img/image-20240411084218869.png) **接口实现思想**: * **写入数据**:当前写入位置指向哪里,就从哪里开始进行写入,如果后续空间不够了,此时有以下两种情况: * 考虑缓冲区整体的空间是否足够,因为读指针也在向后移动,读指针前面的空间也属于空闲空间,如果足够,无需扩容,将待读取的数据向前移动。 * 如果缓冲区的整体空间不够,此时不移动待读取数据,而是直接从读位置开始,向后扩容足够的大小。 数据一旦写入成功,当前写位置就要向后偏移。 * **读取数据**:当前的读取位置指向哪里就从哪里开始读取,前提是有数据可读,**可读数据的大小**就是当前写入位置减去当前读取位置。 ## 1.3 模块实现 **基本框架**: ```c++ #pragma once #include #include class Buffer { private: static const uint64_t default_capacity = 1024; // 默认空间大小 std::vector _buffer; // 使用 vector 进行内存空间管理 uint64_t _reader_index; // 读偏移 uint64_t _writer_index; // 写偏移 public: // 构造函数 Buffer(uint64_t capacity = default_capacity) :_buffer(capacity), _reader_index(0), _writer_index(0) {} // 获取当前写入起始地址 void *WritePos(); // 获取当前读取起始地址 void *ReadPos(); // 获取缓冲区结尾部分的空闲空间大小 uint64_t TailEmptySpace(); // 获取缓冲区开始部分的空闲空间大小 uint64_t HeadEmptySpace(); // 获取可读数据的大小 uint64_t ReadableSize(); // 将读偏移向后移动 void MoveReadIndex(uint64_t len); // 将写偏移向后移动 void MoveWriteIndex(uint64_t len); // 确保可写空间足够(整体空闲空间够了,就移动数据,否则扩容) void EnsureWriteSpace(uint64_t len); // 写入数据 void Write(void *data, uint64_t len); // 读取数据 void Read(void *buffer, uint64_t len); // 清空缓冲区 }; ``` **获取当前写入起始地址** ```c++ char *Begin() { return &*_buffer.begin(); } const char *Begin() const { return &*_buffer.begin(); } // 获取当前写入起始地址 char *WritePos() { return Begin() + _writer_index; } const char *WritePos() const { return Begin() + _writer_index; } ``` **获取当前读取起始地址** ```c++ // 获取当前读取起始地址 char *ReadPos() { return Begin() + _reader_index; } const char *ReadPos() const { return Begin() + _reader_index; } ``` **获取缓冲区结尾部分的空闲空间大小** ```c++ // 获取缓冲区结尾部分的空闲空间大小 uint64_t TailEmptySpace() { return _buffer.size() - _writer_index; } ``` **获取缓冲区开始部分的空闲空间大小** ```c++ // 获取缓冲区开始部分的空闲空间大小 uint64_t HeadEmptySpace() { return _reader_index; } ``` **获取可读数据的大小** ```c++ // 获取可读数据的大小 uint64_t ReadableSize() { return _writer_index - _reader_index; } ``` **将读偏移向后移动** ```c++ // 将读偏移向后移动 void MoveReadIndex(uint64_t len) { // 向后移动的大小,必须小于可读数据的大小 assert(len <= ReadableSize()); // _reader_index + len <= _writer_index _reader_index += len; } ``` **将写偏移向后移动** ```c++ // 将写偏移向后移动 void MoveWriteIndex(uint64_t len) { // 向后移动的大小,必须要小于尾部空闲空间的大小,注意这里不能等于,等于会出现越界的风险 assert(len < TailEmptySpace()); // _writer_index + len < _buffer.size() _writer_index += len; } ``` **确保可写空间足够(整体空闲空间够了,就移动数据,否则扩容)** ```c++ // 确保可写空间足够(整体空闲空间够了,就移动数据,否则扩容) void EnsureWriteSpace(uint64_t len) { // 缓冲区结尾的空间足够 if(len <= TailEmptySpace()) return; // 缓冲区结尾的空间不够,但是整体的空间足够,此时需要移动数据 if(len <= TailEmptySpace() + HeadEmptySpace()) { uint64_t rsz = ReadableSize(); std::copy(ReadPos(), ReadPos()+rsz, Begin()); // 把可读数据拷贝到起始位置 _reader_index = 0; // 读偏移归零 _writer_index = rsz; // 写偏移置为可读数据的大小,此时可读数据的大小就是写偏移量 } else // 缓冲区结尾的空间不足,并且整体空间也不足,此时需要扩容 { _buffer.resize(_writer_index+len); } } ``` **向缓冲区中进行写入** ```c++ // 写入数据 void Write(const void *data, uint64_t len) { if (len == 0) return; // 1. 确保可写空间足够 EnsureWriteSpace(len); // 2. 拷贝数据到缓冲区 const char *d = (const char *)data; std::copy(d, d + len, WritePos()); } // 写入数据并且修改写偏移 void WriteAndUpdateIndex(const void *data, uint64_t len) { uint64_t ret = (data, len); MoveWriteIndex(len); } // 写入字符串数据 void WriteString(const std::string &str) { Write(str.c_str(), str.size()); } // 写入字符串数据并且更新写偏移 void WriteStringUpdateIndex(const std::string &str) { WriteString(str); MoveWriteIndex(str.size()); } // 写入 Buffer 中的数据 void WriteBuffer(const Buffer &buffer) { Write(buffer.ReadPos(), buffer.ReadableSize()); } // 写入 Buffer 中的数据并且更新写偏移 void WriteBufferUpadateIndex(const Buffer &buffer) { WriteBuffer(buffer); MoveWriteIndex(buffer.ReadableSize()); } ``` **从缓冲区中读取数据** ```c++ // 读取数据 uint64_t Read(void *buffer, uint64_t len) { // 1. 确保读取的长度不超过可读数据大小 assert(len <= ReadableSize()); if(len > ReadableSize()) { lg(Warning, "Read error, because len greater than ReadableSize"); return -1; } // 2. 读取数据 char *dest = (char *)buffer; std::copy(ReadPos(), ReadPos() + ReadableSize(), dest); return len; } // 读取数据并且更新读偏移 uint64_t ReadAndUpdateIndex(void *buffer, uint64_t len) { assert(len <= ReadableSize()); if(len > ReadableSize()) { lg(Warning, "Read error, because len greater than ReadableSize"); return -1; } uint64_t ret = (buffer, len); if (ret > 0) MoveReadIndex(ret); return ret; } // 将读取的数据以 string 的形式返回 uint64_t ReadAsString(std::string *dest, uint64_t len) { // 1. 确保读取的长度不超过可读数据的大小 assert(len <= ReadableSize()); if(len > ReadableSize()) { lg(Warning, "Read error, because len greater than ReadableSize"); return -1; } // 2. 读取数据 dest->resize(len); std::copy(ReadPos(), ReadPos() + len, &(*dest)[0]); return len; } // 将读取的数据以 string 的形式返回,并且更新读偏移 uint64_t ReadAsStrUpdIndex(std::string *dest, uint64_t len) { assert(len <= ReadableSize()); if(len > ReadableSize()) { lg(Warning, "Read error, because len greater than ReadableSize"); return -1; } uint64_t ret = ReadAsString(dest, len); if (ret > 0) MoveReadIndex(ret); return ret; } // 找到可读数据中的 \r\n char *FindCRLF() { char *pos = (char *)memchr(ReadPos(), '\n', ReadableSize()); return pos; } // 读取一行数据 uint64_t GetLine(std::string *dest) { char *pos = FindCRLF(); if (pos == nullptr) return 0; uint64_t len = pos - ReadPos() + 1; uint64_t ret = ReadAsString(dest, len); return ret; } // 读取一行数据并且更新读偏移 void GetLineUpdIndex(std::string *dest) { uint64_t ret = GetLine(dest); if (ret > 0) MoveReadIndex(ret); } ``` # 二、日志打印宏 ```c++ #define INF 0 #define DBG 1 #define ERR 2 #define DEFAULT_LEVEL ERR // 用来控制哪些等级的调试信息可以打印 #define LOG(level, formate, ...) do {\ if(level < DEFAULT_LEVEL) break;\ time_t timer = time(nullptr);\ struct tm *cur_time = localtime(&timer);\ char buffer[36] = {0};\ strftime(buffer, 35, "%F %T", cur_time);\ fprintf(stdout, "[%s][%s-%d]:" formate "\n", buffer, __FILE__, __LINE__, ##__VA_ARGS__);\ }while(0) ``` **获取当前时间**:使用 `locatime` 接口,该函数需要传入一个 `time_t` 类型的指针,所以在前面需要先调用 `time` 接口,获取一个 `time_t` 类型的对象,`time` 函数的返回值,本质上是一个时间戳,是 1970 年 1 月 1 日到现在的秒数。`localtime` 接口的返回值是一个 `struct tm` 指针,该指针指向的对象中记录了当前的时间信息(年月日、时分秒等),我们可以自己提取进行打印,也可以借助 `strftime` 函数来帮我们进行处理,该函数会将我们传入的 `struct tm` 结构体指针中包含的时间信息,按照我们想要的格式,转化成一个 C 风格的字符串,因此我们还需要传一块空间,用来存储这个字符串。 # 三、Socket 模块 ## 3.1 模块简介 **功能**: 对原始 `socket` 套接字的操作进行封装。 **意义**: 程序中对于所有套接字的操作变得间简洁。 **需要对外提供的接口**: 创建套接字、绑定地址信息、监听套接字、向服务器发起连接、获取新连接、接收数据、发送数据、关闭套接字、创建一个监听连接(集成)、创建一个客户端连接(集成)、设置套接字选项开启地址端口重用、设置套接字的阻塞属性。 ## 3.2 设计思想 **需要具备的成员变量**: * **套接字描述符**:用来唯一标识一个套接字。 **接口设计思想**: 接口设计上主要是对原始的套接字接口 `socket`、`bind`、`listen`、`accept` 、`connect`、`recv`、`send`、`fcntl` 接口进行封装。 ## 3.3 模块实现 **基本框架**: ```c++ #pragma once #include class Socket { private: int _sockfd; const static int MAX_BACKLOG = 1024; public: Socket(); Socket(int sockfd); ~Socket(); // 创建套接字 bool Creat(); // 绑定地址信息 bool Bind(const uint16_t &server_port, const std::string &server_ip = "0.0.0.0"); // 监听套接字 bool Listen(int backlog = MAX_BACKLOG); // 向服务器发起连接 bool Connect(const std::string &server_ip, const uint16_t &server_port); // 获取新连接 int Accept(); // 暂时不关心客户端的 ip 和端口号 // 接收数据 ssize_t Recv(void *buffer, size_t len, int flag = 0); // 其中 flag 是用来设置阻塞还是非阻塞, 缺省为 0 表示阻塞 // 非阻塞接收数据 ssize_t NonBlockRecv(void *buffer, size_t len); // 发送数据 ssize_t Send(void *buffer, size_t len, int flag = 0); // 非阻塞发送数据 ssize_t NonBlockSend(void *buffer, size_t len); // 关闭套接字 bool Close(); // 创建一个服务端连接(集成) bool CreatServer(const uint16_t &server_port, const std::string &server_ip, int backlog = MAX_BACKLOG); // 创建一个客户端连接(集成) bool CreatClient(const std::string &server_ip, const uint16_t &server_port); // 设置套接字选项开启地址端口重用 void ReuseAddress(); // 设置套接字的阻塞属性 void NonBlock(); }; ``` **具体实现**: ```c++ #pragma once #include #include #include #include #include "Log.hpp" #include #include #include class Socket { private: int _sockfd; const static int MAX_BACKLOG = 1024; public: Socket() : _sockfd(-1) {} Socket(int sockfd) : _sockfd(sockfd) {} ~Socket() { Close(); } // 创建套接字 bool Creat() { _sockfd = socket(AF_INET, SOCK_STREAM, 0); if (_sockfd < 0) { ERR_LOG("Creat Socket Error! erron: %d, errmessage: %s", errno, strerror(errno)); return false; } INF_LOG("Creat Socket Sucess! sockfd: %d", _sockfd); return true; } // 绑定地址信息 bool Bind(const uint16_t &server_port, const std::string &server_ip = "0.0.0.0") { struct sockaddr_in local; memset(&local, 0, sizeof(local)); local.sin_family = AF_INET; local.sin_port = htons(server_port); local.sin_addr.s_addr = inet_addr(server_ip.c_str()); socklen_t len = sizeof(local); int ret = bind(_sockfd, (sockaddr *)&local, len); if (ret < 0) { ERR_LOG("Bind Sockfd: %d Error! errno: %d, errmessage: %s", _sockfd, errno, strerror(errno)); return false; } INF_LOG("Sockfd: %d Bind Sucess! bind_ip: %s, bind_port: %d", _sockfd, server_ip.c_str(), server_port); return true; } // 监听套接字 bool Listen(int backlog = MAX_BACKLOG) { int ret = listen(_sockfd, backlog); if (ret < 0) { ERR_LOG("Listen Sockfd: %d Error! errno: %d, errmessage: %s", _sockfd, errno, strerror(errno)); return false; } INF_LOG("Listen Sockfd: %d Sucess!", _sockfd); return true; } // 向服务器发起连接 bool Connect(const std::string &server_ip, const uint16_t &server_port) { struct sockaddr_in server; memset(&server, 0, sizeof(server)); server.sin_family = AF_INET; server.sin_port = htons(server_port); inet_pton(AF_INET, server_ip.c_str(), &(server.sin_addr)); socklen_t len = sizeof(server); int ret = connect(_sockfd, (sockaddr *)&server, len); if (ret < 0) { ERR_LOG("Sockfd: %d Connect Server Error! errno: %d, errmessage: %s", _sockfd, errno, strerror(errno)); return false; } INF_LOG("Sockfd: %d Connect Server Sucess!", _sockfd); return true; } // 获取新连接 int Accept() { int newsockfd = accept(_sockfd, nullptr, nullptr); // 不关心客户端的 ip 和端口号 if (newsockfd < 0) { if (errno == EAGAIN | errno == EINTR) { return -1; } ERR_LOG("Sockfd: %d Accept Connection Error! errno: %d, errmessage: %s", _sockfd, errno, strerror(errno)); return -1; } INF_LOG("Sockfd: %d Accept A New Connection! newsockfd: %d", _sockfd, newsockfd); return newsockfd; } // 接收数据 ssize_t Recv(void *buffer, size_t len, int flag = 0) // 其中 flag 是用来设置阻塞还是非阻塞, 缺省为 0 表示阻塞 { ssize_t ret = recv(_sockfd, buffer, len, flag); if (ret < 0) { // EAGAIN: 表示当前缓冲区中已经没有数据了,在非阻塞的情况下才会有这个报错 // EINTR: 表示当前的 socket 的阻塞等待,被信号打断了 if (errno == EAGAIN || errno == EINTR) { return 0; // 表示没有接收到数据 } else ERR_LOG("Sockfd: %d Recv Error! errno: %d, errmessage: %s", _sockfd, errno, strerror(errno)); return -1; } else if(ret == 0) { INF_LOG("Client Quit, Sockfd: %d Also Need Quit", _sockfd); Close(); } return ret; // 实际接受数据的长度 } // 非阻塞接收数据 ssize_t NonBlockRecv(void *buffer, size_t len) { return Recv(buffer, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示非阻塞式接收数据 } // 发送数据 ssize_t Send(const void *buffer, size_t len, int flag = 0) { ssize_t ret = send(_sockfd, buffer, len, flag); if (ret < 0) { ERR_LOG("Sockfd: %d Send Error! errno: %d, errmessage: %s", _sockfd, errno, strerror(errno)); return -1; } return ret; } // 非阻塞发送数据 ssize_t NonBlockSend(const void *buffer, size_t len) { return Send(buffer, len, MSG_DONTWAIT); } // 关闭套接字 void Close() { if (_sockfd >= 0) { int ret = close(_sockfd); if (ret == 0) { INF_LOG("Sockfd: %d Close Sucess!", _sockfd); _sockfd = -1; } else ERR_LOG("Sockfd: %d Close Error! errno: %d, errmessage: %s", _sockfd, errno, strerror(errno)); } } // 创建一个服务端连接(集成) bool CreatServer(const uint16_t &server_port, const std::string &server_ip = "0.0.0.0", int backlog = MAX_BACKLOG) { // 1. 创建套接字 2. 设置非阻塞 3. 开启地址端口重用 4. 绑定套接字 5. 监听套接字 if (!Creat()) return false; if (!NonBlock()) return false; if (!ReuseAddress()) return false; if (!Bind(server_port, server_ip)) return false; if (!Listen(backlog)) return false; return true; } // 创建一个客户端连接(集成) bool CreatClient(const std::string &server_ip, const uint16_t &server_port) { // 1. 创建套接字 2. 发起连接 if (!Creat()) return false; if (!Connect(server_ip, server_port)) return false; return true; } // 设置套接字选项开启地址端口重用 bool ReuseAddress() { int i = 0; // 第二个参数表示选项定义的层次,SOL_SOCKET 表示基本套接口选项 // 第三个参数是需要设置的选项名,SO_REUSEADDR 表示复用 ip 地址和端口,SO_REUSEPORT 可要可不要 // 第四个参数指向变量或结构的指针,包含选项的新值 // 第五个参数是变量的大小 if (setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &i, sizeof(i)) == -1) { ERR_LOG("Set Sockfd: %d ReuseAddr Error! errno: %d, errmessage: %s", _sockfd, errno, strerror(errno)); return false; } return true; } // 设置套接字的阻塞属性 bool NonBlock() { // 先获取套接字现有的状态标志,然后再进行设置,这样会保持该套接字原有的状态 int fl = fcntl(_sockfd, F_GETFL); if (fl == -1) { ERR_LOG("Get Sockfd: %d Flags Error! errno: %d, errmessage: %s", _sockfd, errno, strerror(errno)); return false; } if (fcntl(_sockfd, F_SETFL, fl | O_NONBLOCK) == -1) { ERR_LOG("Set Sockfd: %d NonBlock Error! errno: %d, errmessage: %s", _sockfd, errno, strerror(errno)); return false; } return true; } // 返回当前的 _sockfd int GetSockfd() { return _sockfd; } }; ``` # 四、Channel 模块 ## 4.1 模块简介 **功能**: `Channel` 模块是对一个描述符上的事件管理模块,实现对描述符可读、可写、错误等事件的管理操作(添加、删除)。以及事件就绪后,根据不同的事件,回调不同的处理函数。 **意义**: 对于一个描述符的监控事件在用户态更容易维护,以及触发事件后的操作流程更加清晰。 **需要提供的接口**: 分为以下两大类: * **对监控事件的管理**:是否监控了读事件、是否监控了写事件、添加对读事件的监控,添加对写事件的监控、解除读事件监控、解除写事件监控、解除所有事件监控、移除对该文件描述符的监控(从 epoll 底层的红黑树上移除)。 * **对事件触发后的处理**:对于不同事件的回调函数,明确触发了某个事件之后应该怎么处理。这里只关心可读事件、可写事件、挂断事件、错误事件、任意事件。 ## 4.2 设计思想 **需要具备的成员变量**: * 一个文件描述符 `fd` * 因为是采用 `epoll` 对事件进行监控,所以需要一个 `uint32_t` 类型的变量,来存储**需要监控的事件** * 还需要一个 `uint32_t` 类型的变量来保存当前已经**就绪的事件** * 若干事件的回调函数 * **需要关联一个 EventLoop**,添加对文件描述符上的事件监控,需要去调用 **EventLoop** 中的接口,因为一个 **EventLoop** 关联一个 **Poller** 也就是 **epoll**,**Poller** 只是对 **epoll** 操作的封装,**Poller** 模块对用户来书是不可见的,用户只需要通过 **EventLoop** 中的接口去向 **epoll** 中添加(移除)事件监控即可 **接口设计思想**: * 首先是一批对该文件描述符上的事件进行管理的接口、添加读事件监控、添加写事件监控、取消读事件监控、取消写事件监控、是否监控了读事件、是否监控了写事件、取消所有事件的监控、移除对该文件描述符的监控。 * 一批设置回调函数的接口,设置读事件触发后的回调、设置写事件触发后的回调、设置关闭事件触发后的回调、设置错误时间触发后的回调、设置任意事件触发后的回调。 * 处理就绪事件的接口。**EventLoop** 会将有事件就绪的文件描述符对应的 **Channel** 对象返回,此时需要每个 **Channel** 对象自己去调用 **HandlerEvent** 接口,判断是该描述符上的什么事件就绪了,然后执行对应的回调函数。 * 其它的一些接口,如:获取该文件描述符要关心的事件,在 **EventLoop** 向 **epoll** 添加一个描述符上的事件监控时,需要知道该文件描述符需要监控哪些事件。并且最终在执行 **epoll_wait** 的时候,会返回一个描述符上已就绪的事件,所以还需要一个接口将以就绪的事件设置进与其对应的 **Channel** 对象中。 ## 4.3 模块实现 ```c++ class Channel { private: int _fd; // 一个文件描述符 EventLoop *_eventloop; uint32_t _events; // 保存要关心的事件 uint32_t _revents; // 保存已就绪的事件 using event_func_t = std::function; event_func_t _read_callback; // 读事件回调 event_func_t _writ_callback; // 写事件回调 event_func_t _error_callback; // 错误事件回调 event_func_t _close_callback; // 连接断开事件件回调 event_func_t _event_callback; // 任意事件回调 public: Channel(int fd, EventLoop *eventloop) : _fd(fd), _eventloop(eventloop), _events(0), _revents(0) { } // 是否监控了读事件 bool IsReadAble() { return (_events & EPOLLIN); } // 是否监控了写事件 bool IsWritAble() { return (_events & EPOLLOUT); } // 添加对读事件的监控 void AddReadEvent() { _events |= EPOLLIN; // 后续调用 EventLoop 中的接口来添加事件监控 UpdateEvent(); } // 添加对写事件的监控 void AddWritEvent() { _events |= EPOLLOUT; // 后续调用 EventLoop 中的接口来添加事件监控 UpdateEvent(); } // 取消对读事件的监控 void CancleReadEvent() { _events &= (~EPOLLIN); // 后续调用 EventLoop 中的接口来取消对事件的监控 UpdateEvent(); } // 取消对写事件的监控 void CancleWritEvent() { _events &= (~EPOLLOUT); // 后续调用 EventLoop 中的接口来取消对事件的监控 UpdateEvent(); } // 取消对所有事件的监控 void CancleAllEvent() { _events = 0; // 后续调用 EventLoop 中的接口来取消对事件的监控 UpdateEvent(); } // 移除对一个文件描述符的监控 void Remove(); // 从底层红黑树中移除 // { // // 后续调用 EventLoop 中的接口来移除对一个文件描述符的监控 // _poller->RemoveFdEvent(this); // } void UpdateEvent(); // 设置已就绪的事件 void SetRevents(const uint32_t &revents) { _revents = revents; } // 获取要关心的事件 uint32_t GetEvents() { return _events; } // 获取文件描述符 int GetFd() { return _fd; } // 设置回调函数 void SetReadCallback(const event_func_t &cb) { _read_callback = cb; } void SetWrieCallback(const event_func_t &cb) { _writ_callback = cb; } void SetErrorCallback(const event_func_t &cb) { _error_callback = cb; } void SetCloseCallback(const event_func_t &cb) { _close_callback = cb; } void SetEventCallback(const event_func_t &cb) { _event_callback = cb; } // 事件处理 void HandlerEvent() // 一旦有事件触发,就调用这个函数,在这个函数内部判断触发了什么时间,然后调用对应的回调函数。 { // 刷新活跃度 // EPOLLPRI:有紧急数据可读时被触发,通常用于带外数据 // EPOLLRDHUP: 表示对方关闭了连接或者是写操作,此时这一方需要读数据,所以还是调用读事件回调 if ((_revents & EPOLLIN) || (_revents & EPOLLPRI) || (_events & EPOLLRDHUP)) { if (_event_callback) _event_callback(); if (_read_callback) _read_callback(); } // 有可能释放连接的操作一次只处理一个 if (_revents & EPOLLOUT) { if (_event_callback) _event_callback(); if (_writ_callback) _writ_callback(); } else if (_events & EPOLLERR) { if (_event_callback) _event_callback(); if (_event_callback) _event_callback(); } // EPOLLHUP: 与某个文件描述符相关的socket连接被对方关闭或者重置,或因为某种错误而关闭了连接,表示文件描述符被挂断,一般该事件触发表示文件描述符已经不再可用 else if (_events & EPOLLHUP) { if (_event_callback) _event_callback(); if (_close_callback) _close_callback(); } } }; ``` # 五、Poller 模块 ## 5.1 模块简介 **功能**: 对 `epoll` 接口进行封装,实现对任意文件描述符进行事件监控管理。 **意义**: 通过对 `epoll` 接口进行封装,使得用户对文件描述符进行事件监控操作变得更容易。 **需要对外提供的接口**: * 添加或更新描述符所监控的事件 * 移除对描述符的监控 * 开启事件监控 ## 5.2 设计思想 **需要具备的成员变量**: * 必须拥有一个 `epoll` 操作的句柄 * 拥有一个 `struct epoll_event` 类型的数组,用来保存已就绪的事件 * 还需要一个 `hash` 表来管理文件描述符和该文件描述符对应的 `Channel` 对象,主要是在事件就绪后,需要将就绪事件设置进该文件描述符对应的 **Channel** 对象中,所以在对一个文件描述符开始监控前,需要先把该文件描述符和它对应的 **Channel** 对象保存起来。 **接口设计思想**: **添加或更描述符所监控的事件**,先判断该文件描述符是否在 `hash` 表中,如果不在说明是要向 `epoll` 中新增一个文件描述符的事件监控,如果存在,说明需要修改该文件描述符上的事件监控,接下来从一个文件描述符对应的 `Channel` 对象中获取到该文件描述符要关心的事件,调用相关的添加或者修改接口。 **开始监控**:当某个文件描述符就绪后,在 `hash` 表中找到该文件描述符对应的 `Channel` 对象,将该文件描述符上已经就绪的事件设置进 `Channel` 对象。 **移除对文件描述符的监控**:通过对 `epoll` 原生接口进行封装,实现对一个文件描述符移除监控,并将其从 `hash` 表中移除。 ## 5.3 模块实现 ```c++ class Poller { static const int MAX_EPOLLEVENTS = 1024; private: int _epfd; // epoll 操作的句柄 struct epoll_event _eves[MAX_EPOLLEVENTS]; // 保存就绪的事件 std::unordered_map _channels; // 管理已经被监控的文件描述符,和对应的事件 private: // 对 epoll 直接操作 void EpollCtl(Channel *channel, int op) { int fd = channel->GetFd(); struct epoll_event ev; ev.data.fd = fd; ev.events = channel->GetEvents(); int ret = epoll_ctl(_epfd, op, fd, &ev); if (ret < 0) { ERR_LOG("fd: %d EpollCtl Error, errno: %d, errmsg: %s", fd, errno, strerror(errno)); } } // 判断一个文件描述符是否被监控了 bool IsInEpoll(Channel *channel) { auto it = _channels.find(channel->GetFd()); if (it == _channels.end()) return false; return true; } public: Poller() { _epfd = epoll_create(1); if (_epfd < 0) { ERR_LOG("Epollfd Creat Error, errno: %d, errmsg: %s", errno, strerror(errno)); abort(); // 退出程序 } INF_LOG("epfd: %d creat success", _epfd); } // 添加或修改对文件描述符的监控 void UpdateFdEvent(Channel *channel) { if (!IsInEpoll(channel)) { _channels.insert(std::make_pair(channel->GetFd(), channel)); EpollCtl(channel, EPOLL_CTL_ADD); } else EpollCtl(channel, EPOLL_CTL_MOD); } // 移除对文件描述符的监控 void RemoveFdEvent(Channel *channel) { if (IsInEpoll(channel)) { EpollCtl(channel, EPOLL_CTL_DEL); _channels.erase(channel->GetFd()); } else ERR_LOG("fd: %d is not in epoll", channel->GetFd()); } // 开始监控,返回就绪事件 void Poll(std::vector *ready) { int nfds = epoll_wait(_epfd, _eves, MAX_EPOLLEVENTS, -1); // 阻塞式等待 if (nfds < 0) { if (errno == EAGAIN) { WAR_LOG("Epoll_Wait Interrupted by EAGAIN"); return; } else { ERR_LOG("Epoll_Wait Error, errno: %d, errmsg: %s", errno, strerror(errno)); return; } } for (int i = 0; i < nfds; i++) { int fd = _eves[i].data.fd; auto it = _channels.find(fd); it->second->SetRevents(_eves[i].events); ready->push_back(it->second); } } }; ``` # 六、TimerWheel 模块 ## 6.1 模块简介 **功能**: 宏观上 **TimerWheel** 是一个定时任务模块,可以向 **TimerWheel** 中添加一个定时任务,所谓定时任务就是,该任务在规定的一段时间后会自动去执行,任务说白了就是一个函数,就是让当前线程在一段时间之后去调用某个函数。微观上,该模块主要是对一个连接的生命周期进行管理,对非活跃连接进行超时释放功能。 **意义**: 对非活跃连接进行定时释放,可以缓解系统压力。节省系统资源,提高系统效率。 **需要提供的接口**: 添加一个定时任务、刷新一个定时任务、取消一个定时任务。 ## 6.2 设计思想 定时器采用 **时间轮** 的思想来进行设计。时间轮的思想,源于钟表,例如:定一个 8 点钟的起床闹铃,那么当时针走到 8 的时候,就代表时间到了,要去执行起床这件任务。同理,在 **TimerWheel** 模块中定义一个数组(相当于表盘),再定义一个指针 `tick`(相当于表中的指针),指向数组起始位置,这个指针每秒钟向后走动一步,走到哪里,就代表哪里的任务该被执行了,此时,如果我们想要定一个 3s 后的任务,则只需要将任务添加到数组下标为 `tick+3` 的位置,等 3s 后,指针走到这里,就去执行这里的任务。但是,同一时间可能会有大批的定时任务,因此我们可以给数组对应位置下拉一个数组,也就是二维数组,这样就可以在同一时刻上添加多个定时任务了。 ### **6.2.1 如何让指针每秒钟向后走一步?** 采用 **Linux** 为我们提供的定时器文件描述符来实现 **创建一个定时器**: ```c++ #include #include #include #include #include int main() { int tfd = timerfd_create(CLOCK_MONOTONIC, 0); if(tfd < 0) { perror("timerfd_create"); return -1; } struct itimerspec iti; iti.it_value.tv_sec = 1; // 设置第一次超时时间为 1 秒 iti.it_value.tv_nsec = 0; iti.it_interval.tv_sec = 1; // 设置第一次超时之后的超时时间也为 1 秒 iti.it_interval.tv_nsec = 0; int ret = timerfd_settime(tfd, 0, &iti, nullptr); if(ret < 0) { perror("timerfd_settime"); return -1; } while(1) { uint64_t time; int ret = read(tfd, &time, sizeof(time)); if(ret < 0) { perror("read"); return -1; } std::cout << "距离上一次读取超时了: " << time << "次" << std::endl; } return 0; } ``` 因此我们需要再 **TimerWheel** 模块中维护一个**定时器文件描述符 tfd**,并且对该文件描述符添加读事件监控,设置超时时间为 1s,此后每个 1s 系统就会自动向该定时器文件中写入一个 8 字节数据,触发可读事件,之后执行读事件回调函数,在该函数中让指针向后走一步。所以 **TimerWheel** 模块中需要关联一个 **EventLoop**,还要有一个 **tfd 对应的 Channel 对象**。 ### 6.2.2 如何确保一个任务在时间到后被执行以及正确的刷新? 这里对任务进行封装,将任务封装成对象,在任务对象的析构函数中去执行任务,每一次只要当前指针位置的任务对象数组 **clear** 掉,定时任务就会因为任务对象的释放调用析构函数去执行对应的任务。任务刷新,需要借助 **shared_ptr** 来实现,数组里不是直接存储任务对象,而是存储一个被 **shared_ptr** 维护的任务兑对象,这样在 **clear** 当前指针指向的任务对象数组时,不会直接释放对象,而是看 **shared_ptr** 的引用计数,如果为 0,才去销毁对象调用析构,执行任务,否则标明该任务被刷新过,就不会去调用析构,执行任务。所以在 **TimerWheel** 中需要维护一个任务对象和它的 **weak_ptr**,这样方便在任务刷新的时候,只需要根据任务 **id**,找到对应的 **weak_ptr**,将其转换成 **shared_ptr**,添加到新的超时时间点。 ## 6.3 模块实现 ```c++ using task_func_t = std::function; using releas_func_t = std::function; class TimerTask { public: TimerTask(uint64_t taskid, const task_func_t &task, uint32_t timeout, const releas_func_t &releas) : _taskid(taskid), _task(task), _overtime(timeout), _releas(releas), _is_cancle(false) { } void SetReleas(const releas_func_t &cb) { _releas = cb; } int GetTimeOut() { return _overtime; } void SetCancle() { _is_cancle = true; } ~TimerTask() { if (!_is_cancle) { _task(); // 在对象销毁的时候去执行任务 // INF_LOG("执行任务"); } _releas(); } private: uint64_t _taskid; // 任务ID, 用于唯一标识一个任务 task_func_t _task; // 具体要执行的任务 uint32_t _overtime; // 超时时间, 单位是秒 releas_func_t _releas; // 删除 TimerWheel 中 _tasks_ptr 中记录的一个对象的 shared_ptr bool _is_cancle; // 表示是否取消当前任务 }; class TimerWheel // 封装时间轮 { private: using Task_Shared_ptr = std::shared_ptr; // 类型重命名 using Task_Weak_ptr = std::weak_ptr; // 类型重命名 int _capacity; // 数组的最大容量, 最大的延时时间 int _tick; // 滴答指针, 指导那里就执行那里的任务 std::vector>> _whell; // 存放任务的数组 std::unordered_map> _tasks_ptr; // 存放所有任务的 shared_ptr 指针, 因为一个任务不止设置一个超时,可能会重新设置超时,所以需要保存该任务的 shared_ptr 指针 int _tfd; // 一个定时器文件描述符 EventLoop *_eventloop; // 将定时器文件描述符也监控起来 std::unique_ptr _tfd_channel; // 定时器描述符上的事件 private: // 从 _tasks_ptr 中移除一个任务的信息 void ReleasTask(uint64_t taskid) { auto it = _tasks_ptr.find(taskid); if (it != _tasks_ptr.end()) { _tasks_ptr.erase(taskid); } } static int CreatTfd() { int tfd = timerfd_create(CLOCK_MONOTONIC, 0); if (tfd < 0) { ERR_LOG("timerfd_create error, errno: %d, errmsg: %s", errno, strerror(errno)); abort(); } struct itimerspec iti; iti.it_value.tv_sec = 1; // 设置第一次超时时间为 1 秒 iti.it_value.tv_nsec = 0; iti.it_interval.tv_sec = 1; // 设置第一次超时之后的超时时间也为 1 秒 iti.it_interval.tv_nsec = 0; int ret = timerfd_settime(tfd, 0, &iti, nullptr); if (ret < 0) { ERR_LOG("timerfd: %d settime error, errno: %d, errmsg: %s", tfd, errno, strerror(errno)); abort(); } return tfd; } void TfdReadHandler() { uint64_t outtime; int ret = read(_tfd, &outtime, sizeof(outtime)); if (ret < 0) { if (errno == EINTR) return; ERR_LOG("read timerfd: %d error, errno: %d, errmsg: %s", _tfd, errno, strerror(errno)); abort(); } } // 时间轮启动, 该函数应该每秒钟被执行一次, 相当于秒针向后走了一步 void RunTimerWhell() { _whell[_tick].clear(); // 清空指定位置的数组 _tick = (_tick + 1) % _capacity; // INF_LOG("cur time: %d", _tick); } void OnTime() { TfdReadHandler(); RunTimerWhell(); } // 因为一个 EventLoop 中是维护一个定时器的,所以要保证向一个定时器中添加任务的操作是在当前 EventLoop 线程中进行的 // 所以,对于一个一个定时器,如果要向里面添加任务,不能直接调用下面这个接口,因为可能存在两个线程 A 线程正在执行时间轮上的任务,而 B 线程在往时间轮里面添加任务,此时会出问题 // 解决方案:不用加锁,只要需要调用 EventLoop 中的 RunInLoop 接口,将 AddTimerTask 函数传过去,此时 RunInLoop 线程会判断,如果是当前 EventLoop 线程执行的添加操作 // 那么该添加操作会直接执行,如果是其他线程对这个定时器执行的添加操作,那么 EventLoop 会把这个操作添加到自己的任务池中,最后由自己去执行 // 所以下面对这个 AddTimerTaskInLoop 函数进行封装,封装成 AddTimerTask // 这样就保证了对该定时器的添加操作是由一个 EventLoop 线程来执行的。 // 添加一个定时器任务 void AddTimerTaskInLoop(uint64_t taskid, const task_func_t &task, uint32_t timeout) { std::shared_ptr new_task(new TimerTask(taskid, task, timeout, std::bind(&TimerWheel::ReleasTask, this, taskid))); // 创建一个任务对象 _tasks_ptr[taskid] = std::weak_ptr(new_task); // 将该任务的指针保存起来 int pos = (_tick + timeout) % _capacity; _whell[pos].push_back(new_task); // 将任务添加到表中 INF_LOG("add a new task: %ld, first pos: %d", taskid, pos); } // 更新一个定时任务同样存在上面的问题,也需要考虑多线程的问题 // 更新一个定时任务, 将任务重新定时 void UpdateTimerTaskInLoop(uint64_t taskid) { auto it = _tasks_ptr.find(taskid); if (it == _tasks_ptr.end()) return; // 表中没有该任务 std::shared_ptr task = it->second.lock(); // 构建一个新的定时任务 int timeout = task->GetTimeOut(); int pos = (_tick + timeout) % _capacity; _whell[pos].push_back(task); // INF_LOG("update a task: %ld, overtime: %d, capacity: %d, new pos: %d", taskid, timeout, _capacity, pos); } // 取消一个任务同样存在上面的问题,也需要考虑多线程的问题 // 取消一个任务 void CancleTimerTaskInLoop(uint64_t taskid) { auto it = _tasks_ptr.find(taskid); if (it == _tasks_ptr.end()) return; Task_Shared_ptr pt = it->second.lock(); pt->SetCancle(); } public: TimerWheel(EventLoop *eventloop) : _capacity(60), _tick(0), _whell(_capacity), _tfd(CreatTfd()), _eventloop(eventloop), _tfd_channel(new Channel(_tfd, _eventloop)) { _tfd_channel->SetReadCallback(std::bind(&TimerWheel::OnTime, this)); _tfd_channel->AddReadEvent(); // 添加可读事件监控,每秒钟会触发一次 // 当定时器的可读事件触发后,不仅要去读取定时器文件描述符,更重要的是,定时器可读事件触发,意味着 1 秒已经过去,时间轮应该往后走一格 } // 添加一个定时器任务 void AddTimerTask(uint64_t taskid, const task_func_t &task, uint32_t timeout); // 更新一个任务 void UpdateTimerTask(uint64_t taskid); // 取消一个任务 void CancleTimerTask(uint64_t taskid); // 判断当前任务是否在时间轮中 // 这个接口有线程安全问题--这个接口实际上不能被外界使用者调用,只能在模块内,在对应的 EventLoop 线程内执行 bool IsInTimerWhell(uint64_t taskid) { auto it = _tasks_ptr.find(taskid); if (it == _tasks_ptr.end()) return false; return true; } }; ``` # 七、EventLoop 模块 ## 7.1 模块简介 **功能**: **EventLoop** 模块是 **OneThreadOneLoop** 的核心。一个 **EventLoop** 模块就对应了一个线程,该线程对一批文件描述符(连接)进行监控,将来一个连接一定会绑定一个 **EventLoop** 模块,多个连接可以绑定同一个 **EventLoop**,对一个连接的所有操作都是在这个 **EventLoop** 线程中来完成的。 **意义**: **EventLoop** 模块是对 **Poller** 模块和 **TimerWheel** 模块的封装,也就是说用户要监控一个文件描述符不需要自己去调用 **Poller** 模块,而是通过调用 **EventLoop** 模块中的接口,由 **EventLoop** 去调用 **Poller** 中的相关接口,完成事件监控。一个 **EventLoop** 对应一个 **Poller**。同时一个 **EventLoop** 也对应一个 **TimerWheel** 定时器,用户要添加定时任务不是直接调用 **TimerWheel** 中的接口,而是调用 **EventLoop** 中相关的接口,再由 **EventLoop** 去调用 **TimerWheel** 中的相关接口,实现定时任务的添加。 **需要提供的接口**: * 和事件监控相关的:添加修改事件监控,移除对文件描述符的监控 * 和定时任务相关:添加一个定时任务、更新一个定时任务、取消一个定时任务 * 和 eventfd 相关的:创建一个 eventfd,eventfd 处理可读事件的函数 * 其他:判断将要执行的任务是否是当前 EventLoop 线程,将任务添加到当前 EventLoop 的任务池 ## 7.2 设计思想 这里最主要的是如何保证对一个连接的所有操作都在当前的 EventLoop 线程中,解决方案是在 EventLoop 中添加一个任务池,在外部执行一个对连接的操作时,先检查执行该操作的线程是否是当前 EventLoop 线程,如果是那该操作就可以直接被执行,如果不是,那么就需要把这个操作放入任务池中,最终由 EventLoop 去任务池中拿到任务执行。 举个例子:对一个连接添加定时关闭任务,这个添加操作一定是在外部,由用户来调用 `eventloop->AddTimerTask(...)` 执行的,不是 EventLoop 自动执行的,就可能会出现最终调用该函数的线程,并不是 EventLoop 线程,此时就可能出现线程安全问题,线程安全问题出在,此时会有两个线程去访问 **TimerWheel** 中的“表”,**EventLoop** 线程访问“表”去执行定时任务,说白了也就是清楚数组中的任务对象,另外一个线程也去访问“表”,向“表”里添加一个定时任务,说白了就是向数组中添加任务对象,此时数组就成了一个共享资源,可能被多线程同时访问,进而诱发线程安全问题。为了解决这个问题,我们需要对 **TimerWheel** 中的接口做进一步的封装,因为一个定时器绑定了一个 **EventLoop**,一个 **EventLoop** 就对应了一个线程,以添加定时任务为例,要保证添加操作是由绑定的这个 **EventLoop** 线程来执行,就需要在执行具体的添加操作之前,先检查执行该操作的线程是否是当前的 **EventLoop** 线程,如果是就直接执行,不是则将添加任务的操作放进 **EventLoop** 中的任务池中,由 **EventLoop** 线程抽空来执行。 由于 **EventLoop** 在调用 **Poller** 执行事件监控的时候,会阻塞住,这可能导致任务池中的任务不能得到及时的执行,因此我们需要有一个 **eventfd**,来进行事件通知,**eventfd** 本质上是内核帮我们维护的一个计数器,每当向 **eventfd** 中写入一个数值,用于表示事件通知的次数,可以用 read 进行数据读取,读取到的数据就是通知的次数,每一次读取都会将计数清零。我们可以把这个 **eventfd** 也监控起来,每次向任务池中添加一个任务的同时,向 **eventfd** 中写入一个 1,触发可读事件,然后去执行任务池中的所有任务。 **需要具备的成员变量**: * 一个 **Poller** 对象,用于事件的监控 * 一个 **eventfd** 用于事件的监控 * 一个任务池 * 一把锁,用来保护任务池,如上面所说,可能会有另一个线程来往任务池里面添加任务,EventLoop 线程始终要从任务池中拿任务执行,所以要对任务池加锁保护 * 一个定时器 * 一个 **thread::id**,记录当前 **EventLoop** 线程的线程 id **接口设计思想**: 根据上述内容设计相应的接口,不再过多赘述。 ![image-20240419135123325](https://gitee.com/dinosaur-security/img/raw/master/img/image-20240419135123325.png) ## 7.3 模块实现 ```c++ class EventLoop { private: using task_func_t = std::function; std::thread::id _thread_id; // 当前 EventLoop 线程的 ID Poller _poller; // 进行事件监控 int _event_fd; // eventfd 唤醒事件监控可能导致的阻塞 std::unique_ptr _event_fd_channel; // _event_fd 上的事件 std::vector _tasks_poll; // 任务池 std::mutex _mutex; // 实现任务池操作的线程安全 TimerWheel _timerwheel; // 当前 EventLoop 的定时器 private: // 执行任务池中的所有任务 void RunAllTask() { std::vector tmp; { std::unique_lock _lock(_mutex); _tasks_poll.swap(tmp); } for (auto &task : tmp) { task(); // 执行任务 } } static int GetEventFd() { // EFD_CLOEXEC: 表示则当创建该描述符的进程执行一个exec函数时,新的事件文件描述符会自动关闭 // EFD_NONBLOCK: 用于将新创建的事件文件描述符设置为非阻塞模式 int efd = eventfd(0, EFD_CLOEXEC); if (efd < 0) { ERR_LOG("eventfd creat error, errno: %d, errmsg: %s", errno, strerror(errno)); abort(); // 让程序异常退出 }; INF_LOG("eventfd: %d creat success", efd); return efd; } void EventFdReader() { uint64_t res = 0; int ret = read(_event_fd, &res, sizeof(res)); if (res < 0) { if (errno == EINTR) { return; } ERR_LOG("read efd error, errno: %d, errmsg: %s", errno, strerror(errno)); abort(); } else return; } void WeakupEventFd() { uint64_t val = 1; int ret = write(_event_fd, &val, sizeof(val)); if (ret < 0) { if (EINTR == errno) return; ERR_LOG("write efd error, errno: %d, errmsg: %s", errno, strerror(errno)); abort(); } return; } public: EventLoop() : _thread_id(std::this_thread::get_id()), // 获取当前线程的 id _event_fd(GetEventFd()), _event_fd_channel(new Channel(_event_fd, this)), _timerwheel(this) { // 设置 _event_fd 的回调函数 _event_fd_channel->SetReadCallback(std::bind(&EventLoop::EventFdReader, this)); _event_fd_channel->AddReadEvent(); // 添加对读事件的监控 } // 判断当前线程是否是 EventLoop 对应的线程 bool IsInLoop() { return (_thread_id == std::this_thread::get_id()); } // 判断将要执行的任务是否处于当前线程中,如果是则执行,如果不是则压入任务池中 void RunInLoop(const task_func_t &cb) { if (IsInLoop()) return cb(); return AddInPoll(cb); // 如果任务不再当前线程,那么将任务压入任务池中 } // 将任务加入任务池 void AddInPoll(const task_func_t &cb) { { std::unique_lock lock(_mutex); _tasks_poll.push_back(cb); } // 唤醒有可能因为没有事件就绪,而导致的 epoll 阻塞 // 本质操作就是向 eventfd 中写入一个数据,让 eventfd 的读事件就绪 WeakupEventFd(); } // 添加、修改描述符的事件监控 void UpdateEvent(Channel *channel) { return _poller.UpdateFdEvent(channel); } // 移除描述符的监控 void RemoveEvent(Channel *channel) { return _poller.RemoveFdEvent(channel); } // 启动当前 Poller 的事件监控--->就绪事件的处理--->执行任务 void Start() { // 1. 启动事件监控 std::vector channels; // 存放已经就绪的事件 _poller.Poll(&channels); // 2. 就绪事件处理 for (auto &channel : channels) { channel->HandlerEvent(); } // 3. 执行任务 RunAllTask(); } // 添加一个定时任务 void AddTimerTask(uint64_t taskid, const task_func_t &task, uint32_t timeout) { _timerwheel.AddTimerTask(taskid, task, timeout); } // 更新一个定时任务 void UpdateTimerTask(uint64_t taskid) { _timerwheel.UpdateTimerTask(taskid); } // 取消一个定时器任务 void CancleTimerTask(uint64_t taskid) { _timerwheel.CancleTimerTask(taskid); } // 判断一个任务是否在时间轮定时器中 // 因为一个任务可能在时间轮定时器中,只是被取消了,我们可以重新恢复它 bool IsInTimer(uint64_t taskid) { return _timerwheel.IsInTimerWhell(taskid); } }; void Channel::UpdateEvent() { _eventloop->UpdateEvent(this); // _eventloop->UpdateFdEvent(this); } void Channel::Remove() { // _eventloop->RemoveFdEvent(this); _eventloop->RemoveEvent(this); } // 添加一个定时器任务 void TimerWheel::AddTimerTask(uint64_t taskid, const task_func_t &task, uint32_t timeout) { _eventloop->RunInLoop(std::bind(&TimerWheel::AddTimerTaskInLoop, this, taskid, task, timeout)); } // 更新一个任务 void TimerWheel::UpdateTimerTask(uint64_t taskid) { _eventloop->RunInLoop(std::bind(&TimerWheel::UpdateTimerTaskInLoop, this, taskid)); } // 取消一个任务 void TimerWheel::CancleTimerTask(uint64_t taskid) { _eventloop->RunInLoop(std::bind(&TimerWheel::CancleTimerTaskInLoop, this, taskid)); } ``` # 八、EventLoop、TimerWheel、Poller、Channel 联合调试 **一个 server 服务端**: ```c++ #include #include #include #include "Server.hpp" void HandleClose(Channel *channel) { // std::cout << "Close: " << channel->Fd() << std::endl; DBG_LOG("close fd: %d", channel->GetFd()); channel->Remove(); // 移除监控 delete channel; } void HandleRead(Channel *channel) { int fd = channel->GetFd(); char buffer[1024] = {0}; int ret = recv(fd, buffer, 1023, 0); if(ret <= 0) { channel->CancleReadEvent(); return; // return HandleClose(channel); // 这里不能直接去调用 HandleClose,因为添加了定时任务,定时任务就是去执行 HandleClose,如果这里执行了,会导致时间到了后 HandleClose 再一次被执行 } channel->AddWritEvent(); // 启动可写事件 // std::cout << buffer << std::endl; DBG_LOG("%s", buffer); } void HandleWrite(Channel *channel) { int fd = channel->GetFd(); const char *data = "天气还不错!!"; int ret = send(fd, data, strlen(data), 0); if(ret < 0) { return HandleClose(channel); } channel->CancleWritEvent(); } void HandleError(Channel *channel) { return HandleClose(channel); } void HandleEvent(EventLoop *eventloop, Channel *channel, uint64_t taskid) { eventloop->UpdateTimerTask(taskid); } void Accepter(EventLoop *eventloop, Channel *lst_channel) { uint64_t taskid = rand() % 10000; int fd = lst_channel->GetFd(); int newfd = accept(fd, nullptr, nullptr); Channel *channel = new Channel(newfd, eventloop); channel->SetReadCallback(std::bind(HandleRead, channel)); channel->SetWrieCallback(std::bind(HandleWrite, channel)); channel->SetCloseCallback(std::bind(HandleClose, channel)); channel->SetErrorCallback(std::bind(HandleError, channel)); channel->SetEventCallback(std::bind(HandleEvent, eventloop, channel, taskid)); eventloop->AddTimerTask(taskid, std::bind(HandleClose, channel), 10); channel->AddReadEvent(); } int main(int args, char *argv[]) { srand(time(nullptr)); EventLoop eventloop; Socket listen_sock; if (!listen_sock.CreatServer(8080)) { DBG_LOG("Creat Server Error"); return 0; } Channel listen_channel(listen_sock.GetSockfd(), &eventloop); listen_channel.SetReadCallback(std::bind(Accepter, &eventloop, &listen_channel)); listen_channel.AddReadEvent(); while (1) { eventloop.Start(); } } ``` **client 端**: ```c++ #include #include #include #include "Server.hpp" int main() { Socket client_sock; if (!client_sock.CreatClient("127.0.0.1", 8080)) { DBG_LOG("CreatClient Error"); return 0; } for(int i = 0; i < 5; i++) { std::string client_say("Hello Server"); client_sock.Send(client_say.c_str(), client_say.size()); char buffer[1024] = {0}; int num = client_sock.Recv(buffer, sizeof(buffer) - 1); if (num > 0) { buffer[num] = 0; DBG_LOG("Client Recv A Message: %s", buffer); } sleep(1); } while(1) { sleep(1); } return 0; } ``` ![img](file:///C:/Users/wcy/AppData/Local/Packages/Microsoft.Windows.Photos_8wekyb3d8bbwe/TempState/ShareServiceTempFolder/一个简单的EventLoop服务器模块关系图.jpeg) # 九、Connection 模块 ## 9.1 模块简介 **功能**: **Connection** 模块是对于通信连接进行整体管理的一个模块,对一个连接的操作都是通过这个模块进行的。 **意义**: **Connection** 模块是对 **Buffer** 模块、**Socket** 模块、**Channel** 模块的一个整体封装,实现了对一个通信套接字的整体管理,每一个进行数据通信的套接字都会使用 **Conneection** 进行管理。 ## 9.2 设计思想 * **Connection** 模块内部包含有四个由组件使用者传入的回调函数,这四个回调函数分别对应连接的四个不同阶段:连接建立完成阶段的回调、读取到数据的业务处理回调、连接关闭阶段的回调C、任意事件发生后的回调。 * **Connection** 模块内部包含有两个为组件使用者提供的接口:数据发送接口、连接关闭接口。这个数据发送接口的本质工作是把用户数据放入自定义的发送缓冲区,然后启动写事件监控,等触发写事件后,在写事件回调函数通过 **Socket** 进行真正的数据发送。连接关闭也不是直接将连接断开,而是先要检查是否有数据待处理或者发送,在一切处理妥当后再将连接进行关闭。 * **Connection** 模块中包含有两个用户态缓冲区:用户接受缓冲区、用户态发送缓冲区。 * **Connection** 模块内部包含一个 **Socket** 对象,完成描述符面向系统的 I/O 操作。 * **Connection** 模块中包含一个 **Channel** 对象,完成描述符事件就绪的处理。 * **Connection** 模块中包含一个 **EventLoop** 指针,将当前连接绑定到一个 **EvevtLoop** 对象,完成对该描述符上的事件监控。 **具体处理流程**: * 实现向 **Channel** 提供可读、可写、错误等不同时间的 IO 事件回调函数,然后将 **Channel** 和对应的描述符添加到 **Poller** 中进行监控 * 当描述符在 **Poller** 模块中就绪了 IO 可读事件,则调用描述符对应的 **Channel** 中的读事件处理回调函数,进行数据读取,这里的读取是从内核的接受缓冲区中将数据拷贝到用户态缓冲区。然后调用由组件使用者传入的新数据到来回调函数进行业务处理 * 组件使用者在业务处理完毕后,通过 **Connection** 向使用者提供的数据发送接口,将数据存放到用户态的发送缓冲区中 * 启动描述符在 **Poller** 模块中的 IO 写事件监控,事件就绪后,调进用 **Channel** 中保存的写事件处理函数,将发送缓冲区中的数据通过 **Socket** 用户态发送缓冲区中的数据拷贝到内核的发送缓冲区中。 **应该具备的成员变量**: * 一个标识连接的唯一 ID,便于对连接的管理和查找 * 连接的状态,连接在不同阶段需要有对应的状态进行标识。 * 连接关联的文件描述符 * 连接对应的一个 **Socket** 对象,通过该对象实现对连接套接字的管理 * 连接对应的一个 **Channel** 对象,进行连接的事件管理 * 连接绑定的一个 **EventLoop** 对象 * 连接对应的输入缓冲区,存放从 **Socket** 中读取到的数据,因为一次读取到的并不是一个完整的数据,数据不完整用户无法进行业务处理,所以需要我们提供一个缓冲区,把一次接收到的数据存储起来 * 连接对应的输出缓冲区,存放要发送给对端的数据。当发送缓冲区中有数据了,启动读事件监控,当读事件就绪了再调用 `send` 接口将发送缓冲区中的数据拷贝到内核,这样可以避免直接调用 `send` 时,可能因为内核发送缓冲区已经满了,造成发送阻塞 * 请求的接受处理上下文,当前接收到的数据只有请求的一半,需要等剩下一半数据到来了再进行处理,所以需要保存请求处理到哪里了,以便下一次请求到来的时候,接着进行处理,因为这个组件支持不同的协议,而不同协议对数据处理的方法可能不同,所以记录上下文的一定是一个可以接受任意类型的对象 * 一批阶段性的回调函数,从连接建立到连接销毁中间的一些不同阶段,为用户提供一些回调函数,方便用户在不同阶段执行某些任务。如:连接建立之后要执行的回调、读取到数据之后要执行的业务回调函数、连接关闭后要执行的回调、任意事件的回调。 * 是否启动非活跃连接销毁选项,因为可能用户本身就需要长连接。默认设置为 `false` 表示不启动。 * 当前连接要执行定时任务的 ID,这里用连接的 ID,作为定时任务的 ID ## 9.3 模块实现 ```c++ class Connection; using SPtrConnection = std::shared_ptr; class Connection : public std::enable_shared_from_this { private: uint64_t _conn_id; // 连接的唯一 ID,便于连接的管理和查找 // uint64_t _timer_id; // 定时器 ID int _sockfd; // 一个连接关联的文件描述符 bool _enable_inactive_release; // 连接是否启动非活跃销毁的判断标志,默认为 false EventLoop *_loop; // 连接关联的一个 EventLoop ConnStatu _statu; // 连接的状态 Socket _socket; // 连接的套接字管理 Channel _channel; // 连接的事件管理 Buffer _in_buffer; // 输入缓冲区---存放从 socket 中读取到的数据 Buffer _out_buffer; // 输出缓冲区---存放要发送给对端的数据 Any _context; // 请求的接收处理上下文 // 为用户提供一些阶段性的回调函数,便于用户使用 // 这四个回调函数,是让服务器模块来设置的(其实服务器模块的处理回调也是组件使用者设置的) // 也就是说,这四个回调函数是执行用户设置的函数 using ConnectionCallback = std::function; using MessageCallback = std::function; using ClosedCallback = std::function; using AnyEventCallback = std::function; ConnectionCallback _connection_callback; // 连接建立成功之后的回调函数 MessageCallback _message_callback; // 读取到数据后的业务处理回调函数 ClosedCallback _closed_callback; // 连接关闭后的回调函数 AnyEventCallback _anyevent_callback; // 任意事件触发的回调函数 // 组件内的连接关闭回调--组件内设置的,因为服务器组件内会把所有的连接管理起来,一旦某个连接要关闭,就应该从管理的地方移除掉自己的信息 ClosedCallback _server_closed_callback; private: // 五个 Channel 的事件回调函数 // 可读事件触发后的回调函数 void HandleRead() { // 读取 socket 缓冲区中的数据到自定义的输入缓冲区,然后调用 _message_callback // 1. 接受 socket 的数据,放到接受缓冲区 char buf[65536]; ssize_t ret = _socket.NonBlockRecv(buf, sizeof(buf) - 1); if (ret < 0) { // 因为经过封装,所以对端关闭这里 ret 的值还是 -1 return ShutdownInloop(); // 读取出错或者对端关闭,不是直接关闭连接,而是先看是否还有数据待处理 } // 读取数据成功 // 将数据放入输入缓冲区 _in_buffer.WriteAndUpdateIndex(buf, ret); // 2. 调用 _message_callback 进行业务处理 if (_in_buffer.ReadableSize() > 0) // 输入缓冲区中有数据了再调用业务处理函数 { // shared_from_this--从当前对象自身获取自身的 shared_ptr 管理对象 return _message_callback(shared_from_this(), &_in_buffer); } return; } // 可写事件触发后的回调函数 void HandleWrite() { // 将发送缓冲区中的数据进行发送 ssize_t ret = _socket.NonBlockSend(_out_buffer.ReadPos(), _out_buffer.ReadableSize()); if (ret < 0) { // 小于 0 表示发送出错 // 此时要检查接收缓冲区中是否还有数据未处理 if (_in_buffer.ReadableSize() > 0) { _message_callback(shared_from_this(), &_in_buffer); // 调用业务处理函数,读取输入缓冲区的数据进行业务处理 } // 发送出错,直接关闭连接 return ReleaseInloop(); // 真正的关闭连接 } // 发送成功 _out_buffer.MoveReadIndex(ret); // 将缓冲区的数据发送出去后 将读指针向后移动 if (_out_buffer.ReadableSize() == 0) // 发送缓冲区中没有数据了 { _channel.CancleWritEvent(); // 取消对读事件的监控 if (_statu == DISCONNECTING) // 此时连接如果处于待关闭状态,关闭连接 { return ReleaseInloop(); } } // 如果发送缓冲区中还有数据,就不关闭写事件监控,等待下一次触发写事件 return; } // 挂断事件触发后的回调函数 void HandleClose() { // 一旦连接挂断了,套接字就什么都干不了了,因此有数据待处理就处理一下,完毕关闭连接 if (_in_buffer.ReadableSize() > 0) { _message_callback(shared_from_this(), &_in_buffer); // 调用业务处理函数,读取输入缓冲区的数据进行业务处理 } return ReleaseInloop(); } // 错误事件触发后的回调函数 void HandleError() { return HandleClose(); } // 任意事件触发后的回调函数 void HandleEvent() { // 延时定时销毁任务、调用用户的任意事件处理回调函数 if (_enable_inactive_release == true) { _loop->UpdateTimerTask(_conn_id); } if (_anyevent_callback) { _anyevent_callback(shared_from_this()); } return; } // 这个接口并不是真正的发送数据接口,只是把数据放到了发送缓冲区中,启动了可写事件监控,等触发了可写事件之后,才去真正的发送数据 void SendInloop(Buffer buf) { if (_statu == DISCONNECTED) return; _out_buffer.WriteBufferUpadateIndex(buf); if (!_channel.IsWritAble()) // 如果没有启动读事件监控,就启动读事件监控 _channel.AddWritEvent(); return; } // 提供给组件使用者的关闭接口 void ShutdownInloop() { // 并不直接关闭,而是先检查是否还有数据待处理,待发送 // 将连接的状态设置为 DISCONNECTING // 检查发送缓冲区中是否还有数据,如果有,启动写事件监控,没有则关闭连接 _statu = DISCONNECTING; if (_in_buffer.ReadableSize() > 0) { // 有可能因为接受缓冲区中的数据不完整,导致用户回调函数中,并没有把数据读走,这里不管用户是否读走,后续都直接关闭连接 _message_callback(shared_from_this(), &_in_buffer); } if (_out_buffer.ReadableSize() > 0) // 如果进入了这一个分支,连接最终一定会被关闭 { // 如果有数据待发送就启动写事件监控 // 触发写事件的时候,数据发送失败写回调函数中会直接关闭连接 // 触发写事件的时候,数据如果发送成功,且发送缓冲区中没有数据了,连接状态为 DISCONNECTING,连接也会直接关闭 // 如果一次没有发送完,写事件监控不会被取消,等着下一次写事件就绪,再次进行写入即可 if (!_channel.IsWritAble()) _channel.AddWritEvent(); } if (_out_buffer.ReadableSize() == 0) // 如果进入了这一分支,连接最终也一定会被关闭 ReleaseInloop(); } // 实际的释放接口 void ReleaseInloop() { // 1. 修改连接状态 _statu = DISCONNECTED; // 2. 移除连接的事件监控 _loop->RemoveEvent(&_channel); // 3. 关闭描述符 _socket.Close(); // 4. 如果当前定时器任务队列中还有任务,则取消任务 if (_loop->IsInTimer(_conn_id)) { CancleInactiveReleaseInloop(); // DBG_LOG("CancleInactiveReleaseInloop is run"); } // 5. 调用关闭时的回调函数 if (_closed_callback) // 用户的回调函数 _closed_callback(shared_from_this()); if (_server_closed_callback) // 服务器组件内部的回调函数,移除服务器内部管理的连接信息 _server_closed_callback(shared_from_this()); return; } // 启动非活跃连接的超时销毁 void EnableInactiveReleaseInloop(int timeout) { // timeout 的单位为 s // 1. 修改判断标志 _enable_inactive_release = true; // 2. 如果当前定时销毁任务已经存在,那就刷新一下延迟即可 if (_loop->IsInTimer(_conn_id)) return _loop->UpdateTimerTask(_conn_id); // 3. 如果当前定时销毁任务不存在,那就新增一个定时销毁任务 return _loop->AddTimerTask(_conn_id, std::bind(&Connection::ReleaseInloop, this), timeout); } // 取消非活跃连接的超时销毁 void CancleInactiveReleaseInloop() { // 1. 修改状态标记 _enable_inactive_release = false; // 2. 取消定时任务 if (_loop->IsInTimer(_conn_id)) _loop->CancleTimerTask(_conn_id); return; } // 协议切换 void UpgradeInloop(const Any &context, const ConnectionCallback &con_cb, const MessageCallback &mes_cb, const ClosedCallback &clo_cb, const AnyEventCallback &any_cb) { // 重置上下文和各种阶段性的回调函数 _context = context; _connection_callback = con_cb; _message_callback = mes_cb; _closed_callback = clo_cb; _anyevent_callback = any_cb; } // 连接获取之后,所处的状态下要进行的各种设置(启动读监控, 调用用户的回调) void EstablishedInloop() { // 1. 修改连接状态 2. 启动读事件监控 3. 调用回调函数 // 一旦启动读事件监控就有可能立即触发读事件,如果启动了非活跃连接销毁,那么此时就会去刷新定时任务延迟定时任务的执行 // 如果把启动读事件监控放在构造函数中,此时定时任务还没有添加,此时在事件轮中是找不到该定时任务的,逻辑上存在问题 // 所以要保证启动读事件监控要在设置了定时任务之后执行 assert(_statu == CONNECTING); // 当前的状态必须一定是上层的半连接状态 _statu = CONNECTED; // 当前函数执行完毕,连接进入已完成连接的状态,可以开始进行正常通信 _channel.AddReadEvent(); if (_connection_callback) _connection_callback(shared_from_this()); return; } public: Connection(EventLoop *eventloop, uint64_t conn_id, int sockfd) : _conn_id(conn_id), _sockfd(sockfd), _enable_inactive_release(false), _loop(eventloop), _statu(CONNECTING), _socket(_sockfd), _channel(_sockfd, _loop) { // 设置事件触发的回调函数 _channel.SetCloseCallback(std::bind(&Connection::HandleClose, this)); _channel.SetErrorCallback(std::bind(&Connection::HandleError, this)); _channel.SetEventCallback(std::bind(&Connection::HandleEvent, this)); _channel.SetReadCallback(std::bind(&Connection::HandleRead, this)); _channel.SetWrieCallback(std::bind(&Connection::HandleWrite, this)); } ~Connection() { DBG_LOG("release connection: %p", this); } // 获取连接的文件描述符 int Fd() { return _sockfd; } // 获取连接的 ID uint64_t Id() { return _conn_id; } // 是否处于连接建立完成处于可通信的状态 bool IsConnected() { return (_statu == CONNECTED); } // 设置上下文信息 void SetContext(const Any &context) { _context = context; } // 获取上下文信息 Any *GetContext() { return &_context; } // 设置阶段回调函数 void SetConnectionCallback(const ConnectionCallback &con_cb) { _connection_callback = con_cb; } void SetMessageCallback(const MessageCallback &mes_cb) { _message_callback = mes_cb; } void SetClosedCallback(const ClosedCallback &clo_cb) { _closed_callback = clo_cb; } void SetAnyEventCallback(const AnyEventCallback &any_cb) { _anyevent_callback = any_cb; } void SetServerCloseCallback(const AnyEventCallback &ser_clo_cb) { _server_closed_callback = ser_clo_cb; } // 发送数据 void Send(const char *data, size_t len) { // 外界传入的 data,可能是一个临时空间,我们现在只是把发送操作压入了任务池,由可能并没有被立即执行 Buffer buf; buf.WriteAndUpdateIndex(data, len); // 将数据拷贝到发送缓冲区 启动写事件监控 _loop->RunInLoop(std::bind(&Connection::SendInloop, this, buf)); } // 提供给组件使用者的关闭接口 void Shutdown() { // 并不直接关闭,而是先检查是否还有数据待处理 // 将连接的状态设置为 DISCONNECTING // 检查发送缓冲区中是否还有数据,如果有,启动写事件监控,没有则关闭连接 _loop->RunInLoop(std::bind(&Connection::ShutdownInloop, this)); } // 启动非活跃连接的超时销毁 void EnableInactiveRelease(int timeout) { // timeout 的单位为 s _loop->RunInLoop(std::bind(&Connection::EnableInactiveReleaseInloop, this, timeout)); } // 取消非活跃连接的超时销毁 void CancleInactiveRelease() { _loop->RunInLoop(std::bind(&Connection::CancleInactiveReleaseInloop, this)); } // 协议切换 // 这个接口必须立即执行,这就要求必须由 eventloop 线程来发起并执行,因为如果是其它线程发起,那么协议切换操作会被压入任务队列,并不会立即处理 // 此时如果有新事件到来还是用上一次的协议进行数据处理,就会有问题 void Upgrade(const Any &context, const ConnectionCallback &con_cb, const MessageCallback &mes_cb, const ClosedCallback &clo_cb, const AnyEventCallback &any_cb) { _loop->AssetInLoop(); // 重置上下文和各种阶段性的回调函数 _loop->RunInLoop(std::bind(&Connection::UpgradeInloop, this, context, con_cb, mes_cb, clo_cb, any_cb)); } // 连接建立就绪后,进行 Channel 回调设置,启动度监控,调用 _connection_callback void Established() { _loop->RunInLoop(std::bind(&Connection::EstablishedInloop, this)); } }; ``` # 十、Acceptor 模块 ## 10.1 模块简介 **功能**: 实现对一个监听套接字的整体管理 **意义**: 是对 **Socket** 模块、**Channel** 模块的一个整体封装,方便对监听套接字的使用,以及在新连接获取后的处理 ## 10.2 设计思想 当获取一个新连接的文件描述符后,需要为这个通信连接,封装一个 **Connection** 对象,设置连接各个阶段的回调函数。需要注意的是,因为 **Acceptor** 模块本身并不知道一个新连接产生了该干些什么,因为连接的各阶段的回调函数是用户自定义的函数,所以获取一个新连接后,生成 **Connection** 对象,以及为这个对象设置不同阶段的回调函数,都是由服务器模块来进行,用户最终直接使用的是服务器模块,由服务器模块实现对所有连接对象的管理。 **处理流程**: * 创建一个监听套接字 * 启动对监听套接字的读事件监控 * 读事件触发后,调用读事件处理函数 * 在该读事件函数内部先将新连接获取上来也就是得到新连接的文件描述符,然后调用由服务器模块设置的新连接到来回调函数 **需要具备的成员变量**: * 一个 **Socket** 对象,用于对监听套接字进行管理和各种操作 * 一个 **EventLoop** 指针,绑定一个 EventLoop,用于实现对监听套接字上的读事件进行监控 * 一个 **Channel** 对象,用于实现对监听套接字上的事件进行管理 * 一个新连接到来的回调函数 ## 10.3 模块实现 ```c++ class Acceptor { private: Socket _socket; // 用于创建监听套接字 EventLoop *_loop; // 用于对监听套接字进行事件监控 Channel _channel; // 用于对监听套接字进行事件管理 using AcceptCallback = std::function; AcceptCallback _accept_cb; // 获取到一个新连接后执行的回调函数,其中 fd 是新连接的文件描述符 private: void HandelRead() { int newfd = _socket.Accept(); if (newfd < 0) return; if (_accept_cb) _accept_cb(newfd); return; } // 创建一个监听套接字 int CreatServer(const uint16_t &port) { bool ret = _socket.CreatServer(port); assert(ret == true); return _socket.GetSockfd(); } public: // 不能将启动读事件监控,放到构造函数中,必须在将新连接获取后的回调函数设置后再去启动 // 否则有可能造成启动监控后,立即有连接到来,但是此时获取新连接后的回调函数还没有设置,新连接得不到处理,造成资源泄露 Acceptor(EventLoop *loop, const uint16_t &port) : _loop(loop), _socket(CreatServer(port)), _channel(_socket.GetSockfd(), _loop) { _channel.SetReadCallback(std::bind(&Acceptor::HandelRead, this)); // 设置读事件触发后的回调函数 } void SetAcceptCallback(const AcceptCallback &cb) { _accept_cb = cb; } // 开始监听,启动读事件监控 void Listen() { return _channel.AddReadEvent(); } }; ``` **注意事项**:不能将启动读事件监控放在构造函数中,因为我们是单独提供了一个接口来设置新连接到来后的回调函数。如果把启动读事件监控放在构造函数中,那可能会在 **Acceptor** 对象刚建立完,就有一个新连接到来,此时会触发读事件函数,在读事件函数中去调用了新连接到来的回调函数,但是此时可能新连接到来的回调函数还没有被设置,此时就会导致服务器模块无法对这个新来的连接进行有效的管理,无法实现正常的通信。所以这里的解决方案是,单独提供一个接口 `Listen`,来启动读事件监控,并且要求服务器模块在将新连接到来的回调函数设置好之后再来调用这个 `Listen` 接口,启动对读事件的监控。 # 十一、LoopThread 模块 ## 11.1 模块简介 **功能意义**: 将一个 **EventLoop** 对象和一个线程关联起来。 ## 11.2 设计思想 一个 **LoopThread** 对应一个线程,对应一个 **EventLoop** 对象。**EventLoop** 模块实例化的对象,在构造的时候会初始化内部的 `_thread_id` 成员变量。一个连接绑定了一个 **EventLoop** 对象,后面再执行对一个连接的操作的时候,会判断执行该操作的线程是否是 **EventLoop** 对应的线程,本质就是去比较 **EventLoop** 中的 `_thread_id` 和当前执行操作线程的 `id` 是否相等。如果不同就表示执行该操作的线程并不是 **EventLoop** 线程,需要把该操作添加到 **EventLoop** 中的任务池中。所以这就要要求 **EventLoop** 对象必须在它关联的线程内部实例化,这样才能保证一个 **EventLoop** 对象实例化出来的瞬间就是和该线程绑定的。如果是先创建 **EventLoop** 对象,然后再创建线程,然后再将已经创建的 **EventLoop** 对象中的 `_thread_id` 设置成创建出来的线程 `id` ,此时中间会有一段 “真空期”,**EventLoop** 对象中的 `_thread_id` 并不是最终要关联的线程 `id`,此时如果对一个连接执行操作会出问题。所以必须先创建线程,在线程函数内部去实例化 **EventLoop** 对象。 在一个新连接到来创建 **Connection** 对象的时候,需要将其和一个 **EventLoop** 关联,所以该模块需要提供一个获取 **EventLoop** 对象地址的函数,但是会存在下面这种情况,用户在获取 **EventLoop** 对象地址的时候,此时 **EventLoop** 对象还没有被其线程创建出来,可能该线程一直没有别执行,而一般获取新连接创建 **Connection** 对象的都是主线程,此时就涉及到线程的同步问题,在一个线程实例化出 **EventLoop** 对象之前,是不允许其它线程来获取该 **EventLoop** 对象的地址的 **需要具备的成员变量**: * 关联的 **EventLoop** 指针 * 关联的线程 * 一把互斥锁 * 一个条件变量 ## 11.3 模块实现 ```c++ class LoopThread { private: // 互斥锁和条件变量用于实现获取 _loop 的同步关系,避免线程创建了,但是 _loop 还没有实例化之前去获取 _loop // 将来一定是 EventLoop 线程来实例化创建一个 EventLoop 对象,有一个主线程要来获取这个 _loop,所以要用锁和条件变量来实现 同步与互斥 std::mutex _mutex; // 互斥锁 std::condition_variable _cond; // 条件变量 EventLoop *_loop; // EventLoop 指针变量,这个对象需要在线程内部实例化 std::thread _thread; // EventLoop 对应的线程 private: // 实例化 EventLoop 指针,并开始运行 EventLoop 模块的功能 void ThreadEntry() { // 在线程函数内部创建 EventLoop 对象,这样创建出来的 EventLoop 对象中的 _thread_id 就是当前线程 EventLoop loop; // LoopThread 正常情况下是一直在循环运行的,如果 LoopThread 销毁了,EventLoop 也会跟着销毁 { std::unique_lock lock(_mutex); _loop = &loop; _cond.notify_all(); // 唤醒所有可能正在阻塞的线程 } _loop->Start(); } public: // 创建线程,设定线程入口函数 LoopThread() : _loop(nullptr), _thread(std::thread(&LoopThread::ThreadEntry, this)) { } // 返回当前线程关联的 EventLoop 对象指针 EventLoop *GetLoop() { EventLoop *loop = nullptr; { std::unique_lock lock(_mutex); // 使用 unique_lock 对锁进行管理,出了作用域会自动释放 if (_loop == nullptr) // 条件不就绪就去条件变量下等待 _cond.wait(lock); // 条件就绪 loop = _loop; } return loop; } }; ``` # 十二、LoopThreadPool 模块 ## 12.1 模块简介 **功能意义**: 本质上是一个线程池,对所有的 **LoopThread** 进行管理和分配。 * 实现线程数量的可配置 * 为每一个新创建的 **Connection** 对象提供线程分配的功能 ## 12.2 设计思想 * 实现线程数量的可配置:在服务器中,主从 **Reactor** 模型是主线程只负责新连接的获取,从属想爱你成负责新连接的事件监控以及处理,所以在当前服务器中,从线程的数量可能为 0,也就是主从 **Reactor** 服务器变成单 **Reactor** 服务器,一个线程既负责获取新连接,也负责对连接的事件监控和事件处理。 * 提供线程分配的功能,当主线程获取了一个新连接之后,需要将新连接挂接到从属线程上进行事件监控以及处理,假设有 0 个从属线程,则直接分配给主线程的 **EventLoop**,进行处理,假设有多个从属线程,则采用 RR 轮转思想,进行线程的分配(将对应线程的 **EventLoop** 获取到,设置给对应的 **Connection** 对象。 ## 12.3 模块实现 ```c++ class LoopThreadPool { private: int _thread_count; // 从属线程的数量 int _next_idx; // 下一个要分配的 EventLoop EventLoop *_base_loop; // 主线程的 EventLoop,当从线程数量为 0 的时候就要用主线程 std::vector _threads; // 线程池 std::vector _loops; // 保存所有的 Eventloop 信息 public: LoopThreadPool(EventLoop *baseloop) : _thread_count(0), _next_idx(0), _base_loop(baseloop) { } void SetThreadCount(int count) { _thread_count = count; } // 创建出对应数量的从属线程 void Create() { if (_thread_count > 0) { _threads.resize(_thread_count); _loops.resize(_thread_count); for (int i = 0; i < _thread_count; i++) { _threads[i] = new LoopThread(); // 创建从线程 _loops[i] = _threads[i]->GetLoop(); // 主线程一定是在从线程创建出来之后才启动事件监控,获取新连接的 } } } // 返回下一个被分配的 EventLoop EventLoop *NextLoop() { if (_thread_count == 0) return _base_loop; // 如果从线程数量为 0 ,就返回主线程的 EventLoop _next_idx = (_next_idx + 1) % _thread_count; return _loops[_next_idx]; } }; ``` # 十三、TcpServer 模块 ## 13.1 模块简介 这个模块是对之前所有模块的一个整合,通过 **TcpServer** 模块实例化的对象,可以非常简单的完成一个服务器的搭建。 该模块中主要封装了一下内容: * 一个 **Acceptor** 对象,用来创建一个监听套接字,实现对监听套接字的管理 * 一个 **EventLoop** 对象,该 **EventLoop** 对象和主线程关联,在主从 Reactor 模型下,该 **EventLoop** 用来实现对监听套接字的事件监控,主要是获取新连接,然后将新连接分配到从属 Reactor 线程池中。在单 Reactor 模型下,该 EventLoop 对象负责所有文件描述符的监控,包括监听套接字和所有连接套接字 * 一个 **Hash 表**,管理当前服务器上的所有连接 * 一个 **LoopThreadPool** 对象,创建从属 Reactor 线程池,对新建连接进行事件监控以及处理 ## 13.2 设计思想 * 在 **TcpServer** 中实例化一个 **Acceptor** 对象,以及一个 **EventLoop** 对象(主 Reactor) * 将 **Acceptor** 挂接到 **主 Reactor** 上进行事件监控(在 TcpServer 对象创建的时候在构造函数中就已经完成了挂接) * 一旦 **Acceptor** 对象触发了可读事件,则执行读事件回调函数获取新建连接 * 在 Acceptor 的读事件回调函数中,创建一个 **Connection** 对象对连接进行管理,设置连接不同阶段对应的回调函数(连接建立完成的回调、读取到数据的回调、连接关闭的回调、任意事件的回调),启动对 **Connection** 的非活跃连接的超时销毁,将连接对象加入到 **Hash 表** 管理起来。 * 将新连接对应的 **Connection** 挂接到 **LoopThreadPool** 中的从属线程对应的 **EventLoop** 中进行事件监控(这个在创建 Connection 对象的时候就已经和线程池中的一个 EventLoop 进行了关联)。 * 启动 **TcpServer**,主要分两步,根据设置的从属 Reactor 数量,创建一批从属线程,从属线程创建出来后,会立即开始进行事件监控,然后启动主 Reactor 的事件监控(`_baseloop.Start()`),来获取新连接。 * 一旦 **Connection** 对应的连接触发了可读事件,这是执行读事件回调,读取数据,紧接着调用用户设置的新数据到来的回调函数,进行业务处理。处理完之后,调用 **Connection** 中的 `Send` 接口,将结果返回给客户端。 ## 13.3 模块实现 ```c++ class TcpServer { private: uint64_t _next_id; // 一个自动增长的 id uint16_t _port; // 服务器要监听的端口 int _timeout; // 非活跃连接的统计时间---多长时间无通信就是非活跃连接 bool _enable_inactive_release; // 是否启动了非活跃连接超时销毁的标志 EventLoop _base_loop; // 这是主线程的 EventLoop 对象,负责监听事件的处理 Acceptor _acceptor; // 监听套接字的管理对象 LoopThreadPool _pool; // 从属 EventLoop 线程池 std::unordered_map _connes; // 保存管理所有连接对应的 shared_ptr 对象 using ConnectionCallback = std::function; using MessageCallback = std::function; using ClosedCallback = std::function; using AnyEventCallback = std::function; ConnectionCallback _connection_callback; // 连接建立成功之后的回调函数 MessageCallback _message_callback; // 读取到数据后的业务处理回调函数 ClosedCallback _closed_callback; // 连接关闭后的回调函数 AnyEventCallback _anyevent_callback; // 任意事件触发的回调函数 private: // 新连接到来的回调函数-----为新连接创建一个 Connection 对象 void NewConnection(int fd) { ++_next_id; SPtrConnection con(new Connection(_pool.NextLoop(), _next_id, fd)); con->SetConnectionCallback(_connection_callback); con->SetMessageCallback(_message_callback); con->SetClosedCallback(_closed_callback); con->SetAnyEventCallback(_anyevent_callback); con->SetServerCloseCallback(std::bind(&TcpServer::RemoveConnectionInLoop, this, std::placeholders::_1)); if (_enable_inactive_release) con->EnableInactiveRelease(_timeout); // 启动非活跃连接定时销毁 con->Established(); // 启动读事件监控 _connes.insert(std::make_pair(_next_id, con)); } // 连接断开后的回调函数-----将该连接对应的 Connection 对象从 _connes 中移除 void RemoveConnectionInLoop(const SPtrConnection &con) { uint64_t id = con->Id(); auto it = _connes.find(id); if (it != _connes.end()) _connes.erase(id); return; } // 连接断开后的回调函数-----将该连接对应的 Connection 对象从 _connes 中移除 void RemoveConnection(const SPtrConnection &con) { return _base_loop.RunInLoop(std::bind(&TcpServer::RemoveConnectionInLoop, this, con)); } // 添加一个定时任务 void RunAfterInLoop(const task_func_t &task, int timeout) { _next_id++; return _base_loop.AddTimerTask(_next_id, task, timeout); } public: TcpServer(int port) : _next_id(0), _port(port), // 设置服务器的端口号 _timeout(0), // 设置定时销毁的超时时间 _enable_inactive_release(false), // 设置是否启动定时销毁任务的标志 _acceptor(&_base_loop, _port), // 为服务器管理一个监听套接字 _pool(&_base_loop) // 创建从属线程池 { _acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1)); _acceptor.Listen(); // 启动对监听套接字的读事件监控 } // 设置从属线程的数量 void SetThreadCount(int count) { return _pool.SetThreadCount(count); } // 设置四个阶段的回调函数 void SetConnectionCallback(const ConnectionCallback &con_cb) { _connection_callback = con_cb; } void SetMessageCallback(const MessageCallback &mes_cb) { _message_callback = mes_cb; } void SetClosedCallback(const ClosedCallback &clo_cb) { _closed_callback = clo_cb; } void SetAnyEventCallback(const AnyEventCallback &any_cb) { _anyevent_callback = any_cb; } // 启动定时任务销毁功能 void EnableInactiveReleas(int timeout) { _enable_inactive_release = true; _timeout = timeout; return; } // 添加一个定时任务 void RunAfter(const task_func_t &task, int timeout) { return _base_loop.RunInLoop(std::bind(&TcpServer::RunAfterInLoop, this, task, timeout)); } // 启动服务器 void Start() { _pool.Create(); _base_loop.Start(); } }; ``` # 十四、一个简单的 EchoServer 基于上面写的内容,实现一个简单的回显服务器 **EchoServer** ```c++ // EchoServer.hpp #include "../../source/Server.hpp" class EchoServer { private: TcpServer _tcpserver; private: void OnConnected(const SPtrConnection &con) { DBG_LOG("get a connection: %p", con.get()); } void OnMessage(const SPtrConnection &con, Buffer *in_buffer) { DBG_LOG("recv a message: %s", in_buffer->ReadPos()); con->Send(in_buffer->ReadPos(), in_buffer->ReadableSize()); in_buffer->MoveReadIndex(in_buffer->ReadableSize()); con->Shutdown(); } void OnClose(const SPtrConnection &con) { DBG_LOG("connection: %p close", con.get()); } public: EchoServer(int port) : _tcpserver(port) { _tcpserver.SetConnectionCallback(std::bind(&EchoServer::OnConnected, this, std::placeholders::_1)); _tcpserver.SetMessageCallback(std::bind(&EchoServer::OnMessage, this, std::placeholders::_1, std::placeholders::_2)); _tcpserver.SetClosedCallback(std::bind(&EchoServer::OnConnected, this, std::placeholders::_1)); _tcpserver.SetThreadCount(2); } void Start() { _tcpserver.Start(); } }; ``` ```c++ // main.cc #include "EchoServer.hpp" int main() { EchoServer echo_sv(8080); echo_sv.Start(); return 0; } ``` ## 14.2 模块关系图 ![EchoServer模块关系图](https://gitee.com/dinosaur-security/img/raw/master/img/EchoServer模块关系图.jpg) # 十五、压力测试 借助 **WebBench** 来构建大量的请求。WebBench 会创建大量的进程,在进程中,创建客户端连接服务器,发送请求,收到相应后关闭连接,开始下一个连接的建立。 **测试环境**: 服务器为 **4核4G不限带宽**的虚拟机,服务端创建了**一个主 Reactor 线程**,**三个从 Reactor 线程**。客户端使用 WebBench 进行本地测试,创建了 **8000 个进程**,每个进程连续发送请求,**持续 60s**。 **测试结果**: 并发量为 $8000$,QPS(每秒请求数)大约为 $3000 page/s$ ![测试1](https://gitee.com/dinosaur-security/img/raw/master/img/测试1.png) ![测试二](https://gitee.com/dinosaur-security/img/raw/master/img/测试二.png) ![测试四](https://gitee.com/dinosaur-security/img/raw/master/img/测试四.png) ![测试五](https://gitee.com/dinosaur-security/img/raw/master/img/测试五.png) ![测试三](https://gitee.com/dinosaur-security/img/raw/master/img/%E6%B5%8B%E8%AF%95%E4%B8%89.png) 可以发现,当从Reactor线程的数量达到 3 个后,再增加从 Reactor 得到的收益就不那么明显了。可能原因是收到客户端数量的限制,也可能是多线程切换带来的开销。