github链接
在学习多线程的时候必不可少的一个知识点就是线程池,在web服务器中我们就常常会遇见它。在c++中没有标准的线程池,但这个轮子其实早就被大牛们造好了,现在我们就来看看这个github高星线程池的实现。
workers | tasks |
---|---|
线程vector(消费者) | 任务队列(生产者) |
-
接口
- ThreadPool(size_t);
- enqueue(F &&f, Args &&… args);
- ~ThreadPool();
用户通过enqueue来传入任务函数和参数,并可在接口的返回值来获取任务函数的返回值。
-
线程池通过锁和条件变量来进行同步,stop来表示线程池析构,本线程池各个线程会将所有任务做完再结束,类似于TCP LINGER。
class ThreadPool {
public:
ThreadPool(size_t);
template <class F, class... Args>
auto enqueue(F &&f, Args &&... args)
-> std::future<typename std::result_of<F(Args...)>::type>;
~ThreadPool();
private:
// need to keep track of threads so we can join them
std::vector<std::thread> workers;
// the task queue
std::queue<std::function<void()>> tasks;
// synchronization
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
};
如下图,构造函数中,在thread向量中用lamada表达式添加thread函数,这个函数会在while循环中等待任务并执行任务。condition 等待stop条件为真,或者tasks.empty()为假。stop只有在线程池析构的时候才会为真,等线程池的用户向tasks中添加任务函数,线程苏醒,并从队列首部取出“移出”(std::move) 任务并执行。
当然现在线程池中没有任务,threads个子线程都在挂起态。
// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads) : stop(false) {
for (size_t i = 0; i < threads; ++i)
/* 向thread vector
* 中加入threads个线程,which 等待 stop信号或者tasks任务队列中有任务 */
workers.emplace_back([this] {
for (;;) {
std::function<void()> task;
{
/* 注意这里是一个lockguard*/
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(
lock, [this] {
return this->stop || !this->tasks.empty(); });
/* 醒来以后看是否需要停止,如果任务队列非空!
*可以发现它的处理中必须将任务队列所有的任务处理完才退出 */
if (this->stop && this->tasks.empty())
return;
/* 从任务队列pop front,执行任务 */
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
});
}
下图析构函数中,我们加锁后将stop置为true,并通知所有等待中的线程苏醒。
并等待所有的线程结束。在上图中也可见只有stop为真并且任务队列tasks为空才会return,这意味着这些线程会先去处理了剩余的任务,才会结束。当然在其他版本的线程池中,完全可以直退出,而不执行任务。我觉得吧,这样做是为了更加优雅的处理所有任务,而非丢弃一些任务,尤其是前后任务有所关联的情况下。(和tcp linger很像)
// the destructor joins all threads
inline ThreadPool::~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
/* 此时所有的任务必须全部做完才能退出 */
for (std::thread &worker : workers)
worker.join();
}
在添加任务的enqueue函数中会有一堆c++11并发有关的名词需要你去认识。
std::result_of
获得一个函数的返回值std::forward
保持参数的左右值属性,实现完美转发std::bind
将函数通过适配器的方式绑定,生成一个新的可调用对象来“适应”原对象的参数列表。std::future
c++11的一种新同步机制中作为沟通桥梁,可配合std::promise,std::packaged_task,std::async等来实现多线程间函数异步调用的功能。std::packaged_task
可传入一个可调用对象/函数和参数,并可以用get_future()来返回一个std::future对象,可以通过它来确定调用函数的返回值以表示函数“就绪”(已完成),实现“同步”。传入可以通过这个future.get()来获取执行函数的返回值,当然这个函数如果没有执行完毕,此时future.get()会阻塞等待。
首先我们可以看到enqueue的返回值是一个future,用户可以通过这个future.get()获得函数的返回值,保证任务函数已就绪。当然毕竟是个阻塞操作,我觉得服务器应该是不会轻易去调用这个get的。我们可以看到函数的返回值类型typename std::result_of<F(Args...)>::type>
获得是任务函数F的返回值类型。如函数是int func(long,char)
则获得的type是int,这符合我们的预期。
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...))
接着我们用make_shared
去获得一个传入函数和参数的packaged_task的智能指针。
再用packaged_task.get_future()获取std::future类型的res,并在enqueue函数最后作为返回值返回给用户。
然后再在临界区加锁并添加任务(又用lamada表达式,让packaged_task执行其任务函数,以lamada函数的形式去适配任务队列的类型std::function<void()>,std::queue<std::function<void()>> tasks
),并用条件变量通知挂起的线程执行任务。
// add new work item to the pool
template <class F, class... Args>
auto ThreadPool::enqueue(F &&f, Args &&... args)
-> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_其shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
// don't allow enqueueing after stopping the pool
if (stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([task]() {
(*task)(); });
}
condition.notify_one();
return res;
}
到此线程池内部就已经一览无余了。
测试代码中,我们用results来存储每个任务返回的future,这些任务将会在线程池中的各个线程中执行,用户从个result.get()去获得这些函数的返回值,当然如果传入函数的返回值是void,那调用std::future<int>.get()
也一样表示任务函数就绪。
#include "ThreadPool.h"
#include <chrono>
#include <iostream>
#include <vector>
int main() {
ThreadPool pool(4);
std::vector<std::future<int>> results;
for (int i = 0; i < 8; ++i) {
results.emplace_back(pool.enqueue([i] {
// std::cout << "hello " << i << std::endl;
std::cout << "begin:" << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(3));
std::cout << "end" << std::this_thread::get_id() << std::endl;
// std::cout << "world " << i << std::endl;
// std::cout<<std::this_thread::get_id()<<std::endl;
return i * i;
}));
}
// std::cout<<"there"<<std::endl;
for (auto &&result : results) {
std::cout << "wait for the ret " << std::endl;
std::cout << result.get() << ' ' << std::endl;
// std::cout << std::endl;
}
/* int x = 1;
auto res = pool.enqueue([](int &y) { y = 99; }, std::ref(x));
//auto res = pool.enqueue([](int &y) { y = 99; }, x);
res.get();
std::cout<<x<<std::endl;
return 0;
*/}
可以看到上方有我注释掉的一段代码,是和std::bind有关的一个坑,如果使用x和使用std::ref(x)是两种结果,这和std::bind的拷贝行为有关,需要用std::ref间接传入“左值”。
std::ref
源码
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include <condition_variable>
#include <functional>
#include <future>
#include <memory>
#include <mutex>
#include <queue>
#include <stdexcept>
#include <thread>
#include <vector>
class ThreadPool {
public:
ThreadPool(size_t);
template <class F, class... Args>
auto enqueue(F &&f, Args &&... args)
-> std::future<typename std::result_of<F(Args...)>::type>;
~ThreadPool();
private:
// need to keep track of threads so we can join them
std::vector<std::thread> workers;
// the task queue
std::queue<std::function<void()>> tasks;
// synchronization
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
};
// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads) : stop(false) {
for (size_t i = 0; i < threads; ++i)
/* 向thread vector
* 中加入threads个线程,which 等待 stop信号或者tasks任务队列中有任务 */
workers.emplace_back([this] {
for (;;) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
/* 注意这里是一个lockguard*/
this->condition.wait(
lock, [this] {
return this->stop || !this->tasks.empty(); });
/* 醒来以后看是否需要停止,如果任务队列非空!
*可以发现它的处理中必须将任务队列所有的任务处理完才退出 */
if (this->stop && this->tasks.empty())
return;
/* 从任务队列pop front,执行任务 */
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
});
}
// add new work item to the pool
template <class F, class... Args>
auto ThreadPool::enqueue(F &&f, Args &&... args)
-> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
// don't allow enqueueing after stopping the pool
if (stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([task]() {
(*task)(); });
}
condition.notify_one();
return res;
}
// the destructor joins all threads
inline ThreadPool::~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
/* 此时所有的任务必须全部做完才能退出 */
for (std::thread &worker : workers)
worker.join();
}
#endif