# threadPool
**Repository Path**: FengLY_NPU/thread-pool
## Basic Information
- **Project Name**: threadPool
- **Description**: 对网上一个写的很浅显易懂的线程池代码进行整理注释
- **Primary Language**: Unknown
- **License**: MIT
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2021-01-25
- **Last Updated**: 2021-03-17
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# 前言
对[github_Code](https://github.com/mtrebi/thread-pool)的总结整理
# Table of Contents
[Introduction](https://github.com/mtrebi/thread-pool/blob/master/README.md#introduction)
[Build instructions](https://github.com/mtrebi/thread-pool/blob/master/README.md#build-instructions)
[Thread pool](https://github.com/mtrebi/thread-pool/blob/master/README.md#thread-pool)
[Queue](https://github.com/mtrebi/thread-pool/blob/master/README.md#queue)
[Submit function](https://github.com/mtrebi/thread-pool/blob/master/README.md#submit-function)
[Thread worker](https://github.com/mtrebi/thread-pool/blob/master/README.md#thread-worker)
[Usage example](https://github.com/mtrebi/thread-pool/blob/master/README.md#usage-example)
[Use case#1](https://github.com/mtrebi/thread-pool#use-case-1)
[Use case#2](https://github.com/mtrebi/thread-pool#use-case-2)
[Use case#3](https://github.com/mtrebi/thread-pool#use-case-3)
[Future work](https://github.com/mtrebi/thread-pool/blob/master/README.md#future-work)
[References](https://github.com/mtrebi/thread-pool/blob/master/README.md#references)
# 介绍:
线程池是一种技术,它允许开发人员以简单有效的方式利用现代处理器的并发性。在使用线程池的时候,只需要将需要完成的工作提交到线程池中,并且以某种方式在不阻塞主线程的情况下完成这项工作。这是有效的,因为每次我们希望完成工作时线程都不会被初始化。线程初始化一次,并保持非活动状态,直到某些工作必须完成。这样我们可以最小化开销。
构建服务器应用的一个简单模型是,每当一个请求到达就创建一个新线程,而这样的缺陷也十分明显,每有一个请求就创建一个线程,这样所产生的资源消耗是非常巨大的,而在实际应用的大部分情况中,这些请求处理的时间很短但数量却是无比巨大的,除创建线程外,销毁线程也需要消耗系统资源,而系统中往往还有许多活动线程占用系统资源,因而可能会导致过度消耗内存或“切换过度”,因而,服务器需要一些方法来限制任意给定时刻的请求数目。
总结以下线程池的优点:
(1)因为在请求时,线程已经存在,从而消除了线程创建所带来的延迟
(2)通过调整线程池中的线程数目,可以有效防止资源不足
# 构建说明:
此线程池代码主要基于C++11的thread对象,可以支持跨平台的实现:
```
// windows
cd
mkdir build
cd build/
cmake .. "Visual Studio 15 2017 Win64"
```
```c
// Linux
cd
mkdir build
cd build/
cmake ..
make
```
# Thread pool
下面是维基百科给出的线程池的图片:
主要有三个部分组成:
* *Tasks Queue*. 任务队列中存储的是将要被执行的任务。
* *Thread Pool*. 这是一组线程,它们将不断的从队列中取出工作并且执行
* *Completed Tasks*. 当线程已经完成的工作,对应的工作返回其对应的结果,并且通知当前的工作已经完成
## Queue
因为队列满足先进先出的特性,因此我们可以将我们提交的任务,按照我们提交的顺序进行执行。但是当在从队列中取出元素的时候就不可避免的遇到这样一个问题:如果有两个线程在同一时间试图做同样的工作,会发生什么呢,这会导致整个程序都crash。
为了避免这样的问题,我们在标准C++ Queue的基础上实现了一个包装器,该包装器使用互斥量对并发访问进行限制。下面是实现的一个SafeQueue类的小例子:
```c
void enqueue(T& t) {
std::unique_lock lock(m_mutex);
m_queue.push(t);
}
```
通过互斥量的方式,我们可以确保在将任务push到队列的时候,没有其他线程访问这个队列。通过这种方式,我们使任务队列是相对安全的,不必担心许多线程同时访问或者修改。
## Submit function
线程池中最重要的一个方法是负责向队列中添加对应的任务,这个方法可以称之为“提交”。他的工作原理是相对比较简单的,但是实现起来就比较困难。主要原因有以下几点:
* 提交任务必须接受任何带参数的函数
* 提交之后需要立即返回,但是在返回的结果中需要有任务完成之后的结果,这里就必须引入异步的概念。
### Submit implementation
完整的提交函数如下所示:
主要用到了现代C++中的可变参数模板。
```c
// Submit a function to be executed asynchronously by the pool
template
auto submit(F&& f, Args&&... args) -> std::future {
// Create a function with bounded parameters ready to execute
std::function func = std::bind(std::forward(f), std::forward(args)...);
// Encapsulate it into a shared ptr in order to be able to copy construct / assign
auto task_ptr = std::make_shared>(func);
// Wrap packaged task into void function
std::function wrapper_func = [task_ptr]() {
(*task_ptr)();
};
// Enqueue generic wrapper function
m_queue.enqueue(wrapperfunc);
// Wake up one thread if its waiting
m_conditional_lock.notify_one();
// Return future from promise
return task_ptr->get_future();
}
```
上面的代码看上去很复杂,下面我们逐步解释一下每一行的意思:
#### 可变参数模板功能 Variadic template function
```c
template
```
上面的代码主要用到了参数化的模板,第一个模板参数F(我们的函数), 第二个是参数包,参数包是一个特殊的模板参数,它可以接受零个或者多个模板参数。这是在实际中实现模板参数的一种比较常见的方式,具有至少一个参数包的模板称为参数模板。
总结一下这句话,它告诉编译器,submit函数将接受一个f类型的的通用参数,和一个参数组args(函数f的参数)
***补充知识:***
变长参数模板可以接受不同长度的参数长度
下面的例子中主要介绍的就是参数的变长,以及展开
```c
template class Magic;
template
void magic(Ts... args){
std::cout << "args size " << sizeof...(args) << std::endl;
}
//方法1:使用递归进行迭代展开
//展开当前的递归模板
template
void printf1(T0 value){
std::cout << value << endl;
}
template
void printf1(T0 value, Ts... args){
std::cout << value << std::endl;
printf1(args...);
}
// template
// void printf2(T0 t0, T... t){
// std::cout << t0 << std::endl;
// if (sizeof...(t) > 0) printf2(t...); //这里只有C++17支持
// }
int main(){
magic(); //输出0;
magic(1); //输出1;
magic(1," "); //输出2
printf1(1, 2, 3, "123", 4, 5, 6);
// printf2(1, 2, 3, "123", 4, 5, 6);
return 0;
}
```
获取参数列表的方式主要是递归的方式.C++17提供了变参模板展开,但是这里不再赘述,可以参考现代C++这本书
#### Function declaration
```c
auto submit(F&& f, Args&&... args) -> std::future {
```
这种方法看起来可能有些奇怪,因为平时编写代码的时候都是编写以下这种方式:
```c
return-type identifier ( argument-declarations... )
```
但是现在编写的代码中却使用auto来自动判断最后的返回类型
```c
auto identifier ( argument-declarations... ) -> return_type
```
那么为什么要使用自动推导的返回结果呢?假设有一个函数,它的返回类型取决于函数的输入参数.使用第一种语法,由于返回类型的声明是在参数类型之前,因此编译器就不能判断当前的返回类型具体是什么.
使用第二种语法,自动判断当前返回类型的方式,先将返回类型声明为auto,然后使用->可以根据前面声明的参数类型确定返回值.
现在解释一下submit函数的参数,当一个参数的类型被声明为T&&的时候,对于传入的参数而言,这是一个通用的引用,因为T&&可以表示右值引用, 通过模板类型最大的好处就是,这样不会让左值引用一样,只能绑定到常量对象,和右值引用只能绑定右值,这里可以参考[Scott Meyers](https://isocpp.org/blog/2012/11/universal-references-in-c11-scott-meyers)
返回类型是 **std::future**. std::future是一种特殊的类型,它提供了一种机制,来访问异步操作的结果.
最后,std::future的模板类型是decltype(f(args ...)).decltype是一个特殊的C++关键字,用于检查实体的声明类型或者表达式的类型, 在我们的例子中,我们想知道函数f的返回类型,所以使用泛型函数f和参数组args
#### 函数主体 Function body
```c
// Create a function with bounded parameters ready to execute
std::function func = std::bind(std::forward(f), std::forward(args)...);
```
这段代码需要额外关注一下,首先,std::bind(F, args)可以将调用的参数和函数包装器F绑定在一起,但是在这里,只是简单地使用泛型函数f和参数pack argc调用bind, 但是对每一个参数使用另一个包装器std::forward(t), 这个包装器可以实现通用引用的完美转发。对于std::function是用来封装函数的C++对象。它可以像正常函数一样去执行,但是因为他是一个对象,所以我们可以存储,复制,移动这个对象。因为我们将所有的参数 args 绑定到函数 f 上,所以我们只需要添加一对表示参数空列表的空括号: decltype (f (args...))()。
```c
// Encapsulate it into a shared ptr in order to be able to copy construct / assign
auto task_ptr = std::make_shared>(func);
```
接下来我们要做的就是创建一个std::packaged_task(t), 一个packaged_task 是一个可以异步执行的函数的包装器。其结果存储在std::future对象的共享状态中。因为函数f的返回值是decltype (f (args...))() ,它与打包的任务类型相同,然后我们只需要使用std::make_shared将这个打包的任务再次打包到std::shared_ptr中即可。
```c
// Wrap packaged task into void function
std::function wrapperfunc = [task_ptr]() {
(*task_ptr)();
};
```
同样,我们创建一个std::function, 但是需要注意的是这次他的模板类型是void(), 与函数f以及其参数无关,返回类型总是void。 做这一步的主要目的是因为,将它们存储在容器中的唯一方法是通过使用void函数对他们进行包装。在这里我们只是声明这个wrapperfunc来执行绑定func的实际任务task_ptr
```c
// Enqueue generic wrapper function
m_queue.enqueue(wrapperfunc);
```
上面的操作就是将需要执行的函数,push到队列中
```c
// Wake up one thread if its waiting
m_conditional_lock.notify_one();
```
在结束之前,为可以唤醒一个线程,以防止它正在等待。
```c
// Return future from promise
return task_ptr->get_future();
```
最后,我们返回打包任务的std::future对象。因为我们返回的future对象,已经绑定了,打包的任务,同时也绑定了函数func,所以执行这个taskptr将自动更新future。因为,我们将taskptr的执行包装成一个通用的包装函数,所以这里会自动异步的获取最后的结果。
## 线程的工作者 Thread worker
现在我们了解一下submit是如何工作的。接下来我们将关注工作是如何完成的。
一种最简单的方法是轮询当前的的队列,如果当前的任务是可以提交的,那么就执行当前的任务。
Loop
If Queue is not empty
Dequeue work
Do it
上面的实现方法看起来是可以实现的,但是却不怎么有效率,因为当线程是空的时候,线程会不断的循环并且一直询问,队列是空的吗?
一种更合理的方式是通过“休眠”线程来完成,直到一些工作被添加到队列中,正如之前写的那样,一旦我们对工作进行排队,就会发送一个notify_one()信号,这使得我们可以实现一个更高效的算法。
Loop
If Queue is empty
Wait signal
Dequeue work
Do it
这个个信号量可以通过C++的条件变量进行实现,条件变量总是绑定到互斥对象,所以可以在线程池类中添加一个互斥对象来管理整个线程池,一个可以实现的最终代码如下所示:
```c
void operator()() {
std::function func;
bool dequeued;
while (!m_pool->m_shutdown) {
{
std::unique_lock lock(m_pool->m_conditional_mutex);
if (m_pool->m_queue.empty()) {
m_pool->m_conditional_lock.wait(lock);
}
dequeued = m_pool->m_queue.dequeue(func);
}
if (dequeued) {
func();
}
}
}
```
func是std::function的一个包装:
```c
std::function wrapperfunc = [task_ptr]() {
(*task_ptr)();
};
```
# 使用的例子:Usage example
创建线程池
```c
// Create pool with 3 threads
ThreadPool pool(3);
// Initialize pool
pool.init();
```
如果想关闭线程池,可以调用下面的函数
```c
// Shutdown the pool, releasing all threads
pool.shutdown()
```
如果想发送一些任务到线程池,在初始化之后,需要调用submit函数。
```c
pool.submit(work);
```
根据工作的类型,我们区分了不同的用例,假设我们要做的工作是将两个数相乘,我们可以用许多不同的方法去做。下面是能实现比较常见的三种方法:
* Use-Case #1. 直接返回结果
* Use-Case #2. 通过传引用返回结果
* Use-Case #3. 函数打印结果
_Note: 这是只是为了展示提交函数的工作原理。选项并不是唯一的。
## Use-Case #1
带有返回值的multiply函数
```c
// Simple function that adds multiplies two numbers and returns the result
int multiply(const int a, const int b) {
const int res = a * b;
return res;
}
```
submit:
```c
// The type of future is given by the return type of the function
std::future future = pool.submit(multiply, 2, 3);
```
可以方便的使用auto获取结果
```c
auto future = pool.submit(multiply, 2, 3);
```
当线程池完成后,可以异步的调用任务
```c
const int result = future.get();
std::cout << result << std::endl;
```
使用future的get方法,可以得到对应的 int类型
## Use-Case #2
传引用的multiply函数
```c
// Simple function that adds multiplies two numbers and updates the out_res variable passed by ref
void multiply(int& out_res, const int a, const int b) {
out_res = a * b;
}
```
现在,我们必须调用 submit 函数,但有细微的差别。因为我们使用模板和类型演绎(通用引用) ,需要使用 std: ref (param)调用 ref 传递的参数,以确保我们是通过 ref 而不是通过值传递的。
```c
int result = 0;
auto future = pool.submit(multiply, std::ref(result), 2, 3);
// result is 0
future.get();
// result is 6
std::cout << result << std::endl;
```
在这种情况下,future返回的是void类型,但是依然需要调用get方法
## Use-Case #3
最后一个例子是最简单的,我们的乘法函数只是输出结果:
```c
// Simple function that adds multiplies two numbers and prints the result
void multiply(const int a, const int b) {
const int result = a * b;
std::cout << result << std::endl;
}
```
```c
auto future = pool.submit(multiply, 2, 3);
future.get();
```
在这种情况下,我们知道乘法一完成就会打印出来。
Checkout the [main](https://github.com/mtrebi/thread-pool/blob/master/src/main.cpp) program for a complete example.
# Future work
* Make it more reliable and safer (exceptions)
* Find a better way to use it with member functions (thanks to @rajenk)
* Run benchmarks and improve performance if needed
* Evaluate performance and impact of std::function in the heap and try alternatives if necessary. (thanks to @JensMunkHansen)
# References
* [MULTI-THREADED PROGRAMMING TERMINOLOGY - 2017](http://www.bogotobogo.com/cplusplus/multithreaded.php): Fast analysis of how a multi-thread system works
* [Universal References in C++11—Scott Meyers](https://isocpp.org/blog/2012/11/universal-references-in-c11-scott-meyers): Universal references in C++11 by Scott Meyers
* [Perfect forwarding and universal references in C++](http://eli.thegreenplace.net/2014/perfect-forwarding-and-universal-references-in-c/): Article about how and when to use perfect forwarding and universal references
* [C++ documentation](http://www.cplusplus.com/reference/): Thread, conditional variables, mutex and many others...