0%

C++实现一个线程池

原文链接:C++实现一个线程池

介绍

线程池是提高CPU利用率的一个非常高效的方法,线程池就是通过预先创建多个线程,当有任务时就执行,无任务时就阻塞.

相比一般的多线程方法,线程池更加简单,模块化,并且效率更高,因为不会重复创建删除线程.

预备知识

异步线程(包括future,packaged_task等对象): 创建异步返回的线程

wrapper装饰器(function+bind): 实现函数封装,泛型编程

condition_variable条件变量: 线程池任务分发和执行的条件控制

shared_ptr智能指针: 优化内存管理

类型推导和后置返回类型: 怎么实现泛型下正确返回未知类型

完美转发: 了解如何实现完美转发

线程池逻辑实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
#include<iostream>
#include<queue>
#include<functional>
#include<condition_variable>
#include<vector>
#include<thread>
#include<future>
#include<atomic>

class ThreadPools{
public:
ThreadPools(int n_threads): stop(false){
for(int i=0;i<n_threads;i++){
workers.emplace_back([this](){
// 每个线程构造函数
while(true){
int task;
{ //锁的作用域
std::unique_lock<std::mutex> lock(this->mtx); //获取队列锁
//判断队列非空或者线程池stop则返回.
this->cv.wait(lock,[this](){return this->stop||!this->tasks.empty();});

if(this->stop&&this->tasks.empty()) return ;// 如果线程池stop且队列空,结束线程.

task=this->tasks.front();
this->tasks.pop();
}
std::cout<<"run task:"<< task<<"\n";
}
});
}
};
~ThreadPools(){
{
std::unique_lock<std::mutex> lock(mtx);
stop=true;
}
cv.notify_all();

//注意必须是 &work引用参数形式, 线程不允许复制构造
for(std::thread &work: workers){
work.join();
}
std::cout<<"thread pools final stop\n";
};

void enqueue(int task){
{
std::unique_lock<std::mutex> lock(mtx);
if(stop){
std::cout<<"push to a stop pools\n";
return ;
}
tasks.push(task);
}
std::cout<<"push task:"<< task<<"\n";
cv.notify_one();
return ;
};

private:
std::vector<std::thread> workers;
std::queue<int> tasks;
std::condition_variable cv;
std::mutex mtx;
std::atomic<bool> stop;
};

int main(){
ThreadPools pools(2);
for(int i=0;i<10;i++) pools.enqueue(i);

return 0;

简单线程池

实现一个线程池,并执行三种任务: 有参无返回值任务,有参有返回值任务 和 有参返回消息结构体任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
#include<thread>
#include<functional>
#include<future>
#include<condition_variable>
#include<memory>
#include<iostream>
#include<mutex>
#include<atomic>
#include<vector>
#include<queue>
#include<type_traits>
#include<cstring>

class ThreadPools{
public:
ThreadPools(int n_threads): n_threads(n_threads),stop(false){
for(int i=0;i<n_threads;i++){
workers.emplace_back([this](){
while(true){
std::packaged_task<void()> task;
{
std::unique_lock<std::mutex> lock(this->mtx);
this->cv.wait(lock,[this](){return this->stop || !this->tasks.empty();});

if(this->stop && this->tasks.empty()) return ;

task=std::move(this->tasks.front());
this->tasks.pop();
}
task();
//std::cout<<"run a task, "<<"current tasks size="<<tasks.size()<<"\n";
}
});
}
}
~ThreadPools(){
{
std::unique_lock<std::mutex> lock(mtx);
stop=true;
}
cv.notify_all();
for(std::thread &worker : workers){
worker.join();
}
//std::cout<<"thread pools terminated\n";
}

template<typename F>
auto enqueue(F&& f)->std::future<typename std::result_of<F()>::type>;

private:
int n_threads;
std::atomic<bool> stop;
std::vector<std::thread> workers;
std::queue<std::packaged_task<void()>> tasks;
std::condition_variable cv;
std::mutex mtx;
};

template<typename F>
auto ThreadPools::enqueue(F&& f)->std::future<typename std::result_of<F()>::type>{

using return_type=typename std::result_of<F()>::type;

auto task=std::make_shared<std::packaged_task<return_type()>>(std::forward<F>(f));
std::future<return_type> res=task->get_future();

{
std::unique_lock<std::mutex> lock(mtx);
if(stop){
throw std::runtime_error("enqueue on stopped ThreadPool");
}
tasks.emplace([task](){(*task)();});
}
cv.notify_one();
//std::cout<<"push a task, "<<"current tasks size="<<tasks.size()<<"\n";

return res;
}



class Message{
public:
Message(int fd,int from,int to,std::string& content): fd(fd),from(from),to(to),content(content){
size=content.size();
}

int fd;
int from;
int to;
int size;
std::string content;
};

int main(){

ThreadPools tp(3);

// 无返回值任务
for(int i=0;i<10;i++){
tp.enqueue([i](){
std::cout<<"task "<<i<<"\n";
});

}

// 有返回值任务, 正常来讲,返回值是每回轮询,不是等待.
std::vector<std::future<int>> results;
for(int i=0;i<10;i++){
auto f=[](int i) { return i * i; };
std::function<int()> bf=std::bind(f,i);
results.push_back(tp.enqueue(bf));
}
for(auto &result: results){
std::cout<<"get result="<<result.get()<<"\n";
}

// 消息结构体作为返回值更常用
std::vector<std::future<Message>> msgs;
for(int i=0;i<10;i++){
auto f=[](int i) { std::string m="a message from:"; m+=std::to_string(i); Message msg(i,i,i,m); return msg;};
std::function<Message()> bf=std::bind(f,i);
msgs.push_back(tp.enqueue(bf));
}

for(auto &msg: msgs){
Message m(msg.get());
std::cout<<"get message:"<<" from:"<<m.from<<" to:"<<m.to<<" size:"<<m.size<<" content:"<<m.content<<"\n";
}

return 0;
}







// task task 1
// task task 3
// 2task 4

// task 5
// 0task 6

// task 8
// task 9
// get result=task 07

// get result=1
// get result=4
// get result=9
// get result=16
// get result=25
// get result=36
// get result=49
// get result=64
// get result=81
// get message: from:0 to:0 size:16 content:a message from:0
// get message: from:1 to:1 size:16 content:a message from:1
// get message: from:2 to:2 size:16 content:a message from:2
// get message: from:3 to:3 size:16 content:a message from:3
// get message: from:4 to:4 size:16 content:a message from:4
// get message: from:5 to:5 size:16 content:a message from:5
// get message: from:6 to:6 size:16 content:a message from:6
// get message: from:7 to:7 size:16 content:a message from:7
// get message: from:8 to:8 size:16 content:a message from:8
// get message: from:9 to:9 size:16 content:a message from:9