# threadPool **Repository Path**: FengLY_NPU/thread-pool ## Basic Information - **Project Name**: threadPool - **Description**: 对网上一个写的很浅显易懂的线程池代码进行整理注释 - **Primary Language**: Unknown - **License**: MIT - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2021-01-25 - **Last Updated**: 2021-03-17 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 前言 对[github_Code](https://github.com/mtrebi/thread-pool)的总结整理 # Table of Contents  [Introduction](https://github.com/mtrebi/thread-pool/blob/master/README.md#introduction)
 [Build instructions](https://github.com/mtrebi/thread-pool/blob/master/README.md#build-instructions)
 [Thread pool](https://github.com/mtrebi/thread-pool/blob/master/README.md#thread-pool)
        [Queue](https://github.com/mtrebi/thread-pool/blob/master/README.md#queue)
        [Submit function](https://github.com/mtrebi/thread-pool/blob/master/README.md#submit-function)
        [Thread worker](https://github.com/mtrebi/thread-pool/blob/master/README.md#thread-worker)
 [Usage example](https://github.com/mtrebi/thread-pool/blob/master/README.md#usage-example)
        [Use case#1](https://github.com/mtrebi/thread-pool#use-case-1)
        [Use case#2](https://github.com/mtrebi/thread-pool#use-case-2)
        [Use case#3](https://github.com/mtrebi/thread-pool#use-case-3)
 [Future work](https://github.com/mtrebi/thread-pool/blob/master/README.md#future-work)
 [References](https://github.com/mtrebi/thread-pool/blob/master/README.md#references)
# 介绍: 线程池是一种技术,它允许开发人员以简单有效的方式利用现代处理器的并发性。在使用线程池的时候,只需要将需要完成的工作提交到线程池中,并且以某种方式在不阻塞主线程的情况下完成这项工作。这是有效的,因为每次我们希望完成工作时线程都不会被初始化。线程初始化一次,并保持非活动状态,直到某些工作必须完成。这样我们可以最小化开销。 构建服务器应用的一个简单模型是,每当一个请求到达就创建一个新线程,而这样的缺陷也十分明显,每有一个请求就创建一个线程,这样所产生的资源消耗是非常巨大的,而在实际应用的大部分情况中,这些请求处理的时间很短但数量却是无比巨大的,除创建线程外,销毁线程也需要消耗系统资源,而系统中往往还有许多活动线程占用系统资源,因而可能会导致过度消耗内存或“切换过度”,因而,服务器需要一些方法来限制任意给定时刻的请求数目。 总结以下线程池的优点: (1)因为在请求时,线程已经存在,从而消除了线程创建所带来的延迟 (2)通过调整线程池中的线程数目,可以有效防止资源不足 # 构建说明: 此线程池代码主要基于C++11的thread对象,可以支持跨平台的实现: ``` // windows cd mkdir build cd build/ cmake .. "Visual Studio 15 2017 Win64" ``` ```c // Linux cd mkdir build cd build/ cmake .. make ``` # Thread pool 下面是维基百科给出的线程池的图片:

主要有三个部分组成: * *Tasks Queue*. 任务队列中存储的是将要被执行的任务。 * *Thread Pool*. 这是一组线程,它们将不断的从队列中取出工作并且执行 * *Completed Tasks*. 当线程已经完成的工作,对应的工作返回其对应的结果,并且通知当前的工作已经完成 ## Queue 因为队列满足先进先出的特性,因此我们可以将我们提交的任务,按照我们提交的顺序进行执行。但是当在从队列中取出元素的时候就不可避免的遇到这样一个问题:如果有两个线程在同一时间试图做同样的工作,会发生什么呢,这会导致整个程序都crash。 为了避免这样的问题,我们在标准C++ Queue的基础上实现了一个包装器,该包装器使用互斥量对并发访问进行限制。下面是实现的一个SafeQueue类的小例子: ```c void enqueue(T& t) { std::unique_lock lock(m_mutex); m_queue.push(t); } ``` 通过互斥量的方式,我们可以确保在将任务push到队列的时候,没有其他线程访问这个队列。通过这种方式,我们使任务队列是相对安全的,不必担心许多线程同时访问或者修改。 ## Submit function 线程池中最重要的一个方法是负责向队列中添加对应的任务,这个方法可以称之为“提交”。他的工作原理是相对比较简单的,但是实现起来就比较困难。主要原因有以下几点: * 提交任务必须接受任何带参数的函数 * 提交之后需要立即返回,但是在返回的结果中需要有任务完成之后的结果,这里就必须引入异步的概念。 ### Submit implementation 完整的提交函数如下所示: 主要用到了现代C++中的可变参数模板。 ```c // Submit a function to be executed asynchronously by the pool template auto submit(F&& f, Args&&... args) -> std::future { // Create a function with bounded parameters ready to execute std::function func = std::bind(std::forward(f), std::forward(args)...); // Encapsulate it into a shared ptr in order to be able to copy construct / assign auto task_ptr = std::make_shared>(func); // Wrap packaged task into void function std::function wrapper_func = [task_ptr]() { (*task_ptr)(); }; // Enqueue generic wrapper function m_queue.enqueue(wrapperfunc); // Wake up one thread if its waiting m_conditional_lock.notify_one(); // Return future from promise return task_ptr->get_future(); } ``` 上面的代码看上去很复杂,下面我们逐步解释一下每一行的意思: #### 可变参数模板功能 Variadic template function ```c template ``` 上面的代码主要用到了参数化的模板,第一个模板参数F(我们的函数), 第二个是参数包,参数包是一个特殊的模板参数,它可以接受零个或者多个模板参数。这是在实际中实现模板参数的一种比较常见的方式,具有至少一个参数包的模板称为参数模板。 总结一下这句话,它告诉编译器,submit函数将接受一个f类型的的通用参数,和一个参数组args(函数f的参数) ***补充知识:*** 变长参数模板可以接受不同长度的参数长度 下面的例子中主要介绍的就是参数的变长,以及展开 ```c template class Magic; template void magic(Ts... args){ std::cout << "args size " << sizeof...(args) << std::endl; } //方法1:使用递归进行迭代展开 //展开当前的递归模板 template void printf1(T0 value){ std::cout << value << endl; } template void printf1(T0 value, Ts... args){ std::cout << value << std::endl; printf1(args...); } // template // void printf2(T0 t0, T... t){ // std::cout << t0 << std::endl; // if (sizeof...(t) > 0) printf2(t...); //这里只有C++17支持 // } int main(){ magic(); //输出0; magic(1); //输出1; magic(1," "); //输出2 printf1(1, 2, 3, "123", 4, 5, 6); // printf2(1, 2, 3, "123", 4, 5, 6); return 0; } ``` 获取参数列表的方式主要是递归的方式.C++17提供了变参模板展开,但是这里不再赘述,可以参考现代C++这本书 #### Function declaration ```c auto submit(F&& f, Args&&... args) -> std::future { ``` 这种方法看起来可能有些奇怪,因为平时编写代码的时候都是编写以下这种方式: ```c return-type identifier ( argument-declarations... ) ``` 但是现在编写的代码中却使用auto来自动判断最后的返回类型 ```c auto identifier ( argument-declarations... ) -> return_type ``` 那么为什么要使用自动推导的返回结果呢?假设有一个函数,它的返回类型取决于函数的输入参数.使用第一种语法,由于返回类型的声明是在参数类型之前,因此编译器就不能判断当前的返回类型具体是什么. 使用第二种语法,自动判断当前返回类型的方式,先将返回类型声明为auto,然后使用->可以根据前面声明的参数类型确定返回值. 现在解释一下submit函数的参数,当一个参数的类型被声明为T&&的时候,对于传入的参数而言,这是一个通用的引用,因为T&&可以表示右值引用, 通过模板类型最大的好处就是,这样不会让左值引用一样,只能绑定到常量对象,和右值引用只能绑定右值,这里可以参考[Scott Meyers](https://isocpp.org/blog/2012/11/universal-references-in-c11-scott-meyers) 返回类型是 **std::future**. std::future是一种特殊的类型,它提供了一种机制,来访问异步操作的结果. 最后,std::future的模板类型是decltype(f(args ...)).decltype是一个特殊的C++关键字,用于检查实体的声明类型或者表达式的类型, 在我们的例子中,我们想知道函数f的返回类型,所以使用泛型函数f和参数组args #### 函数主体 Function body ```c // Create a function with bounded parameters ready to execute std::function func = std::bind(std::forward(f), std::forward(args)...); ``` 这段代码需要额外关注一下,首先,std::bind(F, args)可以将调用的参数和函数包装器F绑定在一起,但是在这里,只是简单地使用泛型函数f和参数pack argc调用bind, 但是对每一个参数使用另一个包装器std::forward(t), 这个包装器可以实现通用引用的完美转发。对于std::function是用来封装函数的C++对象。它可以像正常函数一样去执行,但是因为他是一个对象,所以我们可以存储,复制,移动这个对象。因为我们将所有的参数 args 绑定到函数 f 上,所以我们只需要添加一对表示参数空列表的空括号: decltype (f (args...))()。 ```c // Encapsulate it into a shared ptr in order to be able to copy construct / assign auto task_ptr = std::make_shared>(func); ``` 接下来我们要做的就是创建一个std::packaged_task(t), 一个packaged_task 是一个可以异步执行的函数的包装器。其结果存储在std::future对象的共享状态中。因为函数f的返回值是decltype (f (args...))() ,它与打包的任务类型相同,然后我们只需要使用std::make_shared将这个打包的任务再次打包到std::shared_ptr中即可。 ```c // Wrap packaged task into void function std::function wrapperfunc = [task_ptr]() { (*task_ptr)(); }; ``` 同样,我们创建一个std::function, 但是需要注意的是这次他的模板类型是void(), 与函数f以及其参数无关,返回类型总是void。 做这一步的主要目的是因为,将它们存储在容器中的唯一方法是通过使用void函数对他们进行包装。在这里我们只是声明这个wrapperfunc来执行绑定func的实际任务task_ptr ```c // Enqueue generic wrapper function m_queue.enqueue(wrapperfunc); ``` 上面的操作就是将需要执行的函数,push到队列中 ```c // Wake up one thread if its waiting m_conditional_lock.notify_one(); ``` 在结束之前,为可以唤醒一个线程,以防止它正在等待。 ```c // Return future from promise return task_ptr->get_future(); ``` 最后,我们返回打包任务的std::future对象。因为我们返回的future对象,已经绑定了,打包的任务,同时也绑定了函数func,所以执行这个taskptr将自动更新future。因为,我们将taskptr的执行包装成一个通用的包装函数,所以这里会自动异步的获取最后的结果。 ## 线程的工作者 Thread worker 现在我们了解一下submit是如何工作的。接下来我们将关注工作是如何完成的。 一种最简单的方法是轮询当前的的队列,如果当前的任务是可以提交的,那么就执行当前的任务。 Loop If Queue is not empty Dequeue work Do it 上面的实现方法看起来是可以实现的,但是却不怎么有效率,因为当线程是空的时候,线程会不断的循环并且一直询问,队列是空的吗? 一种更合理的方式是通过“休眠”线程来完成,直到一些工作被添加到队列中,正如之前写的那样,一旦我们对工作进行排队,就会发送一个notify_one()信号,这使得我们可以实现一个更高效的算法。 Loop If Queue is empty Wait signal Dequeue work Do it 这个个信号量可以通过C++的条件变量进行实现,条件变量总是绑定到互斥对象,所以可以在线程池类中添加一个互斥对象来管理整个线程池,一个可以实现的最终代码如下所示: ```c void operator()() { std::function func; bool dequeued; while (!m_pool->m_shutdown) { { std::unique_lock lock(m_pool->m_conditional_mutex); if (m_pool->m_queue.empty()) { m_pool->m_conditional_lock.wait(lock); } dequeued = m_pool->m_queue.dequeue(func); } if (dequeued) { func(); } } } ``` func是std::function的一个包装: ```c std::function wrapperfunc = [task_ptr]() { (*task_ptr)(); }; ``` # 使用的例子:Usage example 创建线程池 ```c // Create pool with 3 threads ThreadPool pool(3); // Initialize pool pool.init(); ``` 如果想关闭线程池,可以调用下面的函数 ```c // Shutdown the pool, releasing all threads pool.shutdown() ``` 如果想发送一些任务到线程池,在初始化之后,需要调用submit函数。 ```c pool.submit(work); ``` 根据工作的类型,我们区分了不同的用例,假设我们要做的工作是将两个数相乘,我们可以用许多不同的方法去做。下面是能实现比较常见的三种方法: * Use-Case #1. 直接返回结果 * Use-Case #2. 通过传引用返回结果 * Use-Case #3. 函数打印结果 _Note: 这是只是为了展示提交函数的工作原理。选项并不是唯一的。 ## Use-Case #1 带有返回值的multiply函数 ```c // Simple function that adds multiplies two numbers and returns the result int multiply(const int a, const int b) { const int res = a * b; return res; } ``` submit: ```c // The type of future is given by the return type of the function std::future future = pool.submit(multiply, 2, 3); ``` 可以方便的使用auto获取结果 ```c auto future = pool.submit(multiply, 2, 3); ``` 当线程池完成后,可以异步的调用任务 ```c const int result = future.get(); std::cout << result << std::endl; ``` 使用future的get方法,可以得到对应的 int类型 ## Use-Case #2 传引用的multiply函数 ```c // Simple function that adds multiplies two numbers and updates the out_res variable passed by ref void multiply(int& out_res, const int a, const int b) { out_res = a * b; } ``` 现在,我们必须调用 submit 函数,但有细微的差别。因为我们使用模板和类型演绎(通用引用) ,需要使用 std: ref (param)调用 ref 传递的参数,以确保我们是通过 ref 而不是通过值传递的。 ```c int result = 0; auto future = pool.submit(multiply, std::ref(result), 2, 3); // result is 0 future.get(); // result is 6 std::cout << result << std::endl; ``` 在这种情况下,future返回的是void类型,但是依然需要调用get方法 ## Use-Case #3 最后一个例子是最简单的,我们的乘法函数只是输出结果: ```c // Simple function that adds multiplies two numbers and prints the result void multiply(const int a, const int b) { const int result = a * b; std::cout << result << std::endl; } ``` ```c auto future = pool.submit(multiply, 2, 3); future.get(); ``` 在这种情况下,我们知道乘法一完成就会打印出来。 Checkout the [main](https://github.com/mtrebi/thread-pool/blob/master/src/main.cpp) program for a complete example. # Future work * Make it more reliable and safer (exceptions) * Find a better way to use it with member functions (thanks to @rajenk) * Run benchmarks and improve performance if needed * Evaluate performance and impact of std::function in the heap and try alternatives if necessary. (thanks to @JensMunkHansen) # References * [MULTI-THREADED PROGRAMMING TERMINOLOGY - 2017](http://www.bogotobogo.com/cplusplus/multithreaded.php): Fast analysis of how a multi-thread system works * [Universal References in C++11—Scott Meyers](https://isocpp.org/blog/2012/11/universal-references-in-c11-scott-meyers): Universal references in C++11 by Scott Meyers * [Perfect forwarding and universal references in C++](http://eli.thegreenplace.net/2014/perfect-forwarding-and-universal-references-in-c/): Article about how and when to use perfect forwarding and universal references * [C++ documentation](http://www.cplusplus.com/reference/): Thread, conditional variables, mutex and many others...