原文链接:C++实现一个线程池
介绍
线程池是提高CPU利用率的一个非常高效的方法,线程池就是通过预先创建多个线程,当有任务时就执行,无任务时就阻塞.
相比一般的多线程方法,线程池更加简单,模块化,并且效率更高,因为不会重复创建删除线程.
预备知识
异步线程(包括future,packaged_task等对象): 创建异步返回的线程
wrapper装饰器(function+bind): 实现函数封装,泛型编程
condition_variable条件变量: 线程池任务分发和执行的条件控制
shared_ptr智能指针: 优化内存管理
类型推导和后置返回类型: 怎么实现泛型下正确返回未知类型
完美转发: 了解如何实现完美转发
线程池逻辑实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| #include<iostream> #include<queue> #include<functional> #include<condition_variable> #include<vector> #include<thread> #include<future> #include<atomic>
class ThreadPools{ public: ThreadPools(int n_threads): stop(false){ for(int i=0;i<n_threads;i++){ workers.emplace_back([this](){ while(true){ int task; { std::unique_lock<std::mutex> lock(this->mtx); this->cv.wait(lock,[this](){return this->stop||!this->tasks.empty();}); if(this->stop&&this->tasks.empty()) return ; task=this->tasks.front(); this->tasks.pop(); } std::cout<<"run task:"<< task<<"\n"; } }); } }; ~ThreadPools(){ { std::unique_lock<std::mutex> lock(mtx); stop=true; } cv.notify_all(); for(std::thread &work: workers){ work.join(); } std::cout<<"thread pools final stop\n"; }; void enqueue(int task){ { std::unique_lock<std::mutex> lock(mtx); if(stop){ std::cout<<"push to a stop pools\n"; return ; } tasks.push(task); } std::cout<<"push task:"<< task<<"\n"; cv.notify_one(); return ; }; private: std::vector<std::thread> workers; std::queue<int> tasks; std::condition_variable cv; std::mutex mtx; std::atomic<bool> stop; };
int main(){ ThreadPools pools(2); for(int i=0;i<10;i++) pools.enqueue(i); return 0;
|
简单线程池
实现一个线程池,并执行三种任务: 有参无返回值任务,有参有返回值任务 和 有参返回消息结构体任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
| #include<thread> #include<functional> #include<future> #include<condition_variable> #include<memory> #include<iostream> #include<mutex> #include<atomic> #include<vector> #include<queue> #include<type_traits> #include<cstring>
class ThreadPools{ public: ThreadPools(int n_threads): n_threads(n_threads),stop(false){ for(int i=0;i<n_threads;i++){ workers.emplace_back([this](){ while(true){ std::packaged_task<void()> task; { std::unique_lock<std::mutex> lock(this->mtx); this->cv.wait(lock,[this](){return this->stop || !this->tasks.empty();}); if(this->stop && this->tasks.empty()) return ; task=std::move(this->tasks.front()); this->tasks.pop(); } task(); } }); } } ~ThreadPools(){ { std::unique_lock<std::mutex> lock(mtx); stop=true; } cv.notify_all(); for(std::thread &worker : workers){ worker.join(); } } template<typename F> auto enqueue(F&& f)->std::future<typename std::result_of<F()>::type>; private: int n_threads; std::atomic<bool> stop; std::vector<std::thread> workers; std::queue<std::packaged_task<void()>> tasks; std::condition_variable cv; std::mutex mtx; };
template<typename F> auto ThreadPools::enqueue(F&& f)->std::future<typename std::result_of<F()>::type>{ using return_type=typename std::result_of<F()>::type; auto task=std::make_shared<std::packaged_task<return_type()>>(std::forward<F>(f)); std::future<return_type> res=task->get_future(); { std::unique_lock<std::mutex> lock(mtx); if(stop){ throw std::runtime_error("enqueue on stopped ThreadPool"); } tasks.emplace([task](){(*task)();}); } cv.notify_one(); return res; }
class Message{ public: Message(int fd,int from,int to,std::string& content): fd(fd),from(from),to(to),content(content){ size=content.size(); } int fd; int from; int to; int size; std::string content; };
int main(){
ThreadPools tp(3); for(int i=0;i<10;i++){ tp.enqueue([i](){ std::cout<<"task "<<i<<"\n"; }); } std::vector<std::future<int>> results; for(int i=0;i<10;i++){ auto f=[](int i) { return i * i; }; std::function<int()> bf=std::bind(f,i); results.push_back(tp.enqueue(bf)); } for(auto &result: results){ std::cout<<"get result="<<result.get()<<"\n"; } std::vector<std::future<Message>> msgs; for(int i=0;i<10;i++){ auto f=[](int i) { std::string m="a message from:"; m+=std::to_string(i); Message msg(i,i,i,m); return msg;}; std::function<Message()> bf=std::bind(f,i); msgs.push_back(tp.enqueue(bf)); } for(auto &msg: msgs){ Message m(msg.get()); std::cout<<"get message:"<<" from:"<<m.from<<" to:"<<m.to<<" size:"<<m.size<<" content:"<<m.content<<"\n"; } return 0; }
|