0%

C++11标准 条件变量condition variable

原文链接:C++11标准 future异步线程库

介绍

condition variable是c++11标准中用于线程协调同步的对象.主要包括wait和notify两个操作.

wait判断条件决定是否阻塞,notify则负责唤醒阻塞的线程.

条件变量因为是进程间共享的,因此需要互斥访问

标准库

库函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//构造对象
std::condition_variable cv;

// 阻塞线程
void wait( std::unique_lock<std::mutex>& lock );

// pred为0 阻塞 非0表示条件满足,不阻塞
template< class Predicate >
void wait( std::unique_lock<std::mutex>& lock, Predicate pred );

wait还有wait_for和wait_until函数,不太常见,就不列了


void notify_one() noexcept; // 唤醒一个线程
void notify_all() noexcept; // 唤醒所有线程

C++的wait因为是线程共享,所以需要加锁,要求必须使用unique_lock.

unique_lock的优点在于:

  1. unique_lock在异常或者忘记释放锁时,能在生命周期结束时自动解锁,实现线程安全,防止死锁
  2. unique_lock支持手动解锁,条件变量能自动用unique_lock管理锁的状态

实例

生产和消费者:生产者生产5个到队列,然后设置stop结束标志,返回 消费者循环接收,如果stop标志为true且队列空就退出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#include<iostream>
#include<thread>
#include<condition_variable>
#include<queue>

std::queue<int> q;
std::mutex mtx;
std::condition_variable cv;
#define MAX_Q 3
bool stop=false;

void producer(){
for(int i=0;i<5;i++){
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock,[](){return q.size()<MAX_Q;});
q.push(i);
std::cout<<"push:"<<i<<"\n";
lock.unlock();
cv.notify_all();
}
stop=true;
std::cout<<"stop producer()\n";
}
void consumer(){
while(true){
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock,[](){return !q.empty()||stop;});
if(q.empty()&&stop){
std::cout<<"stop consumer()\n";
return ;
}
int c=q.front();q.pop();
lock.unlock();
std::cout<<"pop:"<<c<<"\n";
cv.notify_all();
}
}

int main(){
std::thread t1(producer);
std::thread t2(consumer);
t1.join();
t2.join();
return 0;
}

模拟线程池

简单实现一个线程池的条件变量控制逻辑,不实现具体操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
#include<iostream>
#include<queue>
#include<functional>
#include<condition_variable>
#include<vector>
#include<thread>
#include<future>
#include<atomic>

class ThreadPools{
public:
ThreadPools(int n_threads): stop(false){
for(int i=0;i<n_threads;i++){
workers.emplace_back([this](){
// 每个线程构造函数
while(true){
int task;
{ //锁的作用域
std::unique_lock<std::mutex> lock(this->mtx); //获取队列锁
//判断队列非空或者线程池stop则返回.
this->cv.wait(lock,[this](){return this->stop||!this->tasks.empty();});

if(this->stop&&this->tasks.empty()) return ;// 如果线程池stop且队列空,结束线程.

task=this->tasks.front();
this->tasks.pop();
}
std::cout<<"run task:"<< task<<"\n";
}
});
}
};
~ThreadPools(){
{
std::unique_lock<std::mutex> lock(mtx);
stop=true;
}
cv.notify_all();

//注意必须是 &work引用参数形式, 线程不允许复制构造
for(std::thread &work: workers){
work.join();
}
std::cout<<"thread pools final stop\n";
};

void enqueue(int task){
{
std::unique_lock<std::mutex> lock(mtx);
if(stop){
std::cout<<"push to a stop pools\n";
return ;
}
tasks.push(task);
}
std::cout<<"push task:"<< task<<"\n";
cv.notify_one();
return ;
};

private:
std::vector<std::thread> workers;
std::queue<int> tasks;
std::condition_variable cv;
std::mutex mtx;
std::atomic<bool> stop;
};

int main(){
ThreadPools pools(2);
for(int i=0;i<10;i++) pools.enqueue(i);

return 0;
}

特性

  1. condition_variable是跨平台的
  2. 使用简单,直接提供了线程阻塞和唤醒机制,减去了复杂的信号量同步设计

意义

能简化多线程的同步设计