0%

C++11标准 future异步线程库

原文链接:C++11标准 future异步线程库

前言

c++标准有很多的版本,比较知名的如c++98是第一版c++标准,提供了c++最底层的支持. 后面的c++11和c++20每个版本都会给c++带来新特性.

而线程就是是c++11最重要的特性.

虽然c++98有pthread库,但是c++11的thread有更好的跨平台能力,最重要的是c++11很多线程特性都是在thread库基础上实现的.

例如本节的异步线程future库,以及其他的如信号量,智能指针,functional函数封装,万能模板,完美转发等等.

这些特性为c++提供了原生线程支持.

thread实现 普通的异步线程

主线程创建一个线程执行操作,在这个过程中子线程和主线程并发执行.

如果该线程需要返回一个计算结果,此时可以通过传入变量的引用传参或全局变量实现.例如

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include <iostream>
#include <thread>
#include <chrono>

// 异步任务函数
void asyncTask(int& a) {
std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟耗时操作
a+=1;
}

int main() {
int a = 5;
std::cout << "Main thread is starting...\n";

std::thread t1(asyncTask, std::ref(a));

std::cout << "Main thread is doing other work...\n";

if (t1.joinable()) {
t1.join();
}

std::cout << "Result from thread: " << a << "\n";
std::cout << "Main thread is completed.\n";
return 0;
}

主线程通过引用参数,可以获取计算结果

如果主线程和子线程执行过程中都要操作同一个对象,那就还需要信号量实现 临界区操作的线程同步

future的作用

c++11 库主要提供了4种不同层次的对象,分别适用不同应用场景.

std::future: future可以表示一个线程返回的结果,.get() 会阻塞直到异步操作完成,还有其他的一些特性
std::promise: future可以绑定promise,可以在一个线程中通过promise设置值,然后在另一个线程中使用future获取这个值
std::async: 线程的高阶封装,async会自动创建线程执行操作,并且返回future对象获取结果
std::packaged_task: 相比async更加灵活,可以将一个任务封装成 packaged_task对象,显式调用thread执行.

packaged_task是实现线程池的关键对象,可以保证线程的重复利用,而不是每个任务就新创建线程

还有一些其他的对象:
std::shared_future

future

函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//模板类型
template< class T > class future;
template< class T > class future<T&>;
template<> class future<void>;

T/T&/void get();
一直阻塞直到返回对象值

//对象是否有效
bool valid() const noexcept;
当future对象未绑定对象或者对象已经获取过了,返回false
return true(有效) false(无效)

//3 种wait函数

//阻塞到获得返回值
void wait() const;
//阻塞一定时间
template< class Rep, class Period >
std::future_status wait_for( const std::chrono::duration<Rep,Period>& timeout_duration ) const;
//阻塞到设定时间点
template< class Clock, class Duration >
std::future_status wait_until( const std::chrono::time_point<Clock,Duration>& timeout_time ) const;

std::future_status有3种状态:
future_status::ready 准备好了
future_status::timeout 超时
future_status::deferred std::async通过参数std::launch::deferred创建一个延迟执行的异步线程,该状态表示程序还未开始执行,当调用get或wait才开始执行

特性

  1. future本身无法单独使用,要结合promise或async才能绑定对象
  2. future只能移动构造,不能复制

promise

函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//模板类型
template< class R > class promise;
template< class R > class promise<R&>;
template<> class promise<void>;

//获取future
std::future<R> get_future();
该返回的对象是promise唯一关联的future对象,不能复制,只能移动
return future对象

//设置值,可以传入右值引用,引用,常引用
设置完毕后future此时变为ready
void set_value( const R& value );
void set_value( R&& value );
void set_value( R& value );

延迟设置,当线程结束时才设置,参数同set_value
void set_value_at_thread_exit(const R&/R&&/R& value);

特性

  1. promise与future是唯一对应的

示例:子线程等一秒返回数据,主线程get等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#include<future>
#include<thread>
#include<iostream>
#include<chrono>

int main(){
std::promise<int> p;

std::future<int> f=p.get_future();

std::thread t([](std::promise<int>& p){
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
p.set_value(1024);
},std::ref(p));

std::cout<<f.get()<<"\n";
return 0;
}

示例:子线程等一秒返回数据,主线程wait_for等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include<future>
#include<thread>
#include<iostream>
#include<chrono>

int main(){
std::promise<int> p;

std::future<int> f=p.get_future();

std::thread t([](std::promise<int>& p){
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
p.set_value(1024);
},std::ref(p));
t.detach();
while(f.valid()){
std::future_status status=f.wait_for(std::chrono::duration<int,std::milli>(500));
if(status==std::future_status::ready){
std::cout<<"get value="<<f.get()<<"\n";
}else if(status==std::future_status::timeout){
std::cout<<"time out\n";
}
}
return 0;
}

输出:

1
2
3
time out
get value=1024
terminate called without an active exception

不加detach时发现程序出错了,主线程退出时子线程未结束

有两种方法,1是detach脱离主线程,2是join等待

在异步多线程里面一般是脱离主线程处理

async

函数

1
2
3
4
5
6
7
8
9
//不指定策略
template< class F, class... Args >
std::future async( F&& f, Args&&... args );
//指定策略
template< class F, class... Args >
std::future async( std::launch policy, F&& f, Args&&... args );

policy有3async(立即执行) deferred(延迟执行) any(自动选择)

特性

特点:

  1. deferred会使线程延迟到当future被wait或get调用时才执行. deferred的线程如果一直没执行的话,一定程度上能降低负载.
  2. deferred策略不能和wait_for搭配使用,会导致线程一直不执行
  3. async+future能大幅度减少线程的代码量

优点:
简单,API直接使用,提高异步线程开发效率

缺点:
async会隐式创建线程,会增大程序性能开销

示例:deferred+wait_for 程序死锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include<future>
#include<thread>
#include<iostream>
#include<chrono>

int main(){

std::future<int> f=std::async(std::launch::deferred,[](int a)
{
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
return a+1;
},1023);

while(f.valid()){
std::future_status status=f.wait_for(std::chrono::duration<int,std::milli>(500));
if(status==std::future_status::ready){
std::cout<<"get value="<<f.get()<<"\n";
}else if(status==std::future_status::timeout){
std::cout<<"time out\n";
}
}

return 0;
}

packaged_task:和future一样只能移动构造

函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//packaged_task构造,R是返回类型, ArgTypes...是函数的参数类型
template< class R, class ...ArgTypes >
class packaged_task<R(ArgTypes...)>;

//交换两个任务
void swap( packaged_task& other ) noexcept;

// 验证共享对象是否有效
bool valid()
return true(future对象有效) false(无效)

// 返回future对象
std::future<R> get_future();

// 同promise中,当线程结束时才设置future
void make_ready_at_thread_exit();

// 重置task状态,允许任务未执行前重新获取future对象
void reset();

特性

packaged_task最大特点是可用结合wrapper实现无参封装,同时不会自动创建线程.

示例1:packaged_task创建任务并通过thread执行

注意packaged_task在传给线程的时候必须move修饰为一个右值(将亡值),因为不可复制构造

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include<iostream>
#include<future>
#include<thread>
#include<chrono>
#include<unistd.h>
int main(){
//定义task
std::packaged_task<int(int)> task([=](int a){return a+1;});

//定义两个返回值,先get future一次,此时的task 已经和ret1绑定
std::future<int> ret1,ret2;
ret1=task.get_future();
//在任务未开始前重置task 和ret2绑定
task.reset();
ret2=task.get_future();

std::thread t(std::move(task),1023);
t.detach();

std::cout<<ret2.get()<<"\n";

return 0;
}

示例2: 使用packaged_task+function封装,实现一个简单的线程池

参考: C++实现一个线程池