原文链接:C++11标准 future异步线程库
介绍
condition variable是c++11标准中用于线程协调同步的对象.主要包括wait和notify两个操作.
wait判断条件决定是否阻塞,notify则负责唤醒阻塞的线程.
条件变量因为是进程间共享的,因此需要互斥访问
标准库
库函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| std::condition_variable cv;
void wait( std::unique_lock<std::mutex>& lock );
template< class Predicate > void wait( std::unique_lock<std::mutex>& lock, Predicate pred );
wait还有wait_for和wait_until函数,不太常见,就不列了
void notify_one() noexcept; void notify_all() noexcept;
|
C++的wait因为是线程共享,所以需要加锁,要求必须使用unique_lock.
unique_lock的优点在于:
- unique_lock在异常或者忘记释放锁时,能在生命周期结束时自动解锁,实现线程安全,防止死锁
- unique_lock支持手动解锁,条件变量能自动用unique_lock管理锁的状态
实例
生产和消费者:生产者生产5个到队列,然后设置stop结束标志,返回 消费者循环接收,如果stop标志为true且队列空就退出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| #include<iostream> #include<thread> #include<condition_variable> #include<queue>
std::queue<int> q; std::mutex mtx; std::condition_variable cv; #define MAX_Q 3 bool stop=false;
void producer(){ for(int i=0;i<5;i++){ std::unique_lock<std::mutex> lock(mtx); cv.wait(lock,[](){return q.size()<MAX_Q;}); q.push(i); std::cout<<"push:"<<i<<"\n"; lock.unlock(); cv.notify_all(); } stop=true; std::cout<<"stop producer()\n"; } void consumer(){ while(true){ std::unique_lock<std::mutex> lock(mtx); cv.wait(lock,[](){return !q.empty()||stop;}); if(q.empty()&&stop){ std::cout<<"stop consumer()\n"; return ; } int c=q.front();q.pop(); lock.unlock(); std::cout<<"pop:"<<c<<"\n"; cv.notify_all(); } }
int main(){ std::thread t1(producer); std::thread t2(consumer); t1.join(); t2.join(); return 0; }
|
模拟线程池
简单实现一个线程池的条件变量控制逻辑,不实现具体操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
| #include<iostream> #include<queue> #include<functional> #include<condition_variable> #include<vector> #include<thread> #include<future> #include<atomic>
class ThreadPools{ public: ThreadPools(int n_threads): stop(false){ for(int i=0;i<n_threads;i++){ workers.emplace_back([this](){ while(true){ int task; { std::unique_lock<std::mutex> lock(this->mtx); this->cv.wait(lock,[this](){return this->stop||!this->tasks.empty();}); if(this->stop&&this->tasks.empty()) return ; task=this->tasks.front(); this->tasks.pop(); } std::cout<<"run task:"<< task<<"\n"; } }); } }; ~ThreadPools(){ { std::unique_lock<std::mutex> lock(mtx); stop=true; } cv.notify_all(); for(std::thread &work: workers){ work.join(); } std::cout<<"thread pools final stop\n"; }; void enqueue(int task){ { std::unique_lock<std::mutex> lock(mtx); if(stop){ std::cout<<"push to a stop pools\n"; return ; } tasks.push(task); } std::cout<<"push task:"<< task<<"\n"; cv.notify_one(); return ; }; private: std::vector<std::thread> workers; std::queue<int> tasks; std::condition_variable cv; std::mutex mtx; std::atomic<bool> stop; };
int main(){ ThreadPools pools(2); for(int i=0;i<10;i++) pools.enqueue(i); return 0; }
|
特性
- condition_variable是跨平台的
- 使用简单,直接提供了线程阻塞和唤醒机制,减去了复杂的信号量同步设计
意义
能简化多线程的同步设计