0%

C++ Linux多线程同步通信-信号量

原文链接:C++ Linux多线程同步通信-信号量

介绍

C++多线程能够提升程序的资源利用率,提升效率.涉及到的库有:
thread,
mutex,
chrono,
semaphore

线程库thread

1
2
3
4
5
6
7
8
9
构造函数:线程构造的参数需要使用ref和cref包装后保证深拷贝
std::thread t() 空线程对象
std::thread t(func,args) 线程函数对象
std::thread t(thread u) 移动线程对象
std::thread t=u 赋值形式的移动线程对象
成员函数:
bool joinable() 是否可加入
.join() 加入

几种构造形式

  1. 空线程
  2. 函数线程
  3. 类函数线程
  4. 移动复制线程
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    #include <iostream>
    #include <thread>
    #include <chrono>
    using namespace std;

    int func1(int n){
    cout<<"func1 thread run\n";
    for(int i=0;i<n;i++){
    cout<<"func1 thread:"<<i<<"\n";
    this_thread::sleep_for(std::chrono::milliseconds(10));
    }
    return 0;
    }

    class M{
    public:
    M(){}
    int func2(int n){
    cout<<"M.func2 thread run\n";
    for(int i=0;i<n;i++){
    cout<<"M.func2 thread:"<<i<<"\n";
    this_thread::sleep_for(std::chrono::milliseconds(10));
    }
    return 0;
    }
    };

    int main()
    {

    thread t1; //空线程
    thread t1_(func1,4); //函数参数线程
    M m;
    thread t2(&M::func2,&m,3); //带参数类线程

    cout<<"t1 is:"<<t1.joinable()<<" t1_ is:"<<t1_.joinable()<<"\n";
    t1=move(t1_); //移动线程
    //或者thread t2(move(t1_));
    cout<<"t1 is:"<<t1.joinable()<<" t1_ is:"<<t1_.joinable()<<"\n";

    if(t1.joinable()) t1.join();
    if(t1_.joinable()) t1_.join();
    if(t2.joinable()) t2.join();


    return 0;
    }

线程传参

对于一般数据可以直接传入,但是引用数据需要深拷贝传参.

当主线程引用数据改变时,其他线程使用ref深拷贝的同步改变.但是其他线程改变数据不影响主线程的对象(?)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include <iostream>
#include <thread>
#include <vector>
using namespace std;

int func1(int n, vector<int> a) {
cout << "other thread arr[n-1]=" << a[n-1] << "\n";
a[0]=n-1;
return 0;
}

int main() {
int n = 10;
vector<int> a(n);
for (int i = 0; i < n; i++) a[i] = i;
vector<int> b(n);
for (int i = 0; i < n; i++) b[i] = i;

thread t1(func1, n, ref(a)); // 传递引用
thread t2(func1, n, b); // 传递副本

a[n-1]=0;
b[n-1]=0;

// 等待线程完成
if (t1.joinable()) t1.join();
if (t2.joinable()) t2.join();

cout << "main thread a[0]=" << a[0] << "\n";
cout << "main thread b[0]=" << b[0] << "\n";
return 0;
}


other thread arr[n-1]=9
other thread arr[n-1]=0
main thread a[0]=0
main thread b[0]=0

线程同步与通信

同步

互斥锁

互斥锁能够实现简单的互斥操作,保证临界区互斥访问,实现方式有3种:

  1. mutex手动加锁: 手动加锁和解锁不可靠,当程序异常或忘记加锁时可能死锁

  2. lock_guard自动锁: 变量的生命周期就是临界区范围,可以通过手动{}实现

  3. unique_lock: 能够实现同时上多个锁,使用defer_lock手动上锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58

#include <iostream>
#include<thread>
#include<mutex>
#include<chrono>

using namespace std;
int c=0;

mutex m;
// 1.
void func1() {
for(int i=0; i<3; i++) {
m.lock();
c++;
m.unlock();
cout<<c<<"\t";

}
}

// 2.
void func2() {
for(int i=0; i<3; i++) {
{
lock_guard<mutex> guard(m);
c++;
}
cout<<c<<"\t";

}
}
// 3.
mutex n1;
mutex n2;
void func3() {
for(int i=0; i<3; i++) {
{
unique_lock<mutex> um(m,defer_lock);
unique_lock<mutex> un1(n1,defer_lock);
unique_lock<mutex> un2(n2,defer_lock);
//before lock ..
lock(um,un1,un2); //同时上锁
c++;
}
cout<<c<<"\t";

}
}

int main()
{
thread t1(func3),t2(func3);
if(t1.joinable()) t1.join();
if(t2.joinable()) t2.join();

return 0;
}

信号量

提供 counting_semaphore和二维信号量,但是二维的可用用mutex.
counting_semaphore没有直接提供获取当前信号量值的函数,可以使用POSIX实现或者对counting实现一个封装

直接counting_semaphore实现的生产者消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

#include <iostream>
#include<thread>
#include<chrono>
#include <semaphore>

using namespace std;
counting_semaphore Empty(3),Use(0);
int v=0;
void producer() {
int i=0;
while(i++<10){
Empty.acquire();
cout<<"create 1\n";
Use.release(1);
}
}
void consumer() {
int i=0;
while(i++<10){
Use.acquire();
cout<<"use 1\n";
Empty.release(1);
}
}
int main()
{
thread t1(producer),t2(consumer);
if(t1.joinable()) t1.join();
if(t2.joinable()) t2.join();

return 0;
}

实现可计数的信号量,该信号量可以实现进程同步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#include <iostream>
#include<thread>
#include<chrono>
#include <semaphore>

using namespace std;
sem_t Empty,Use;
int v=0;
void producer() {
int i=0;
while(i++<10){
sem_wait(&Empty);
sem_getvalue(&Empty,&v);
cout<<"create 1 contain("<<v<<")\n";
this_thread::sleep_for(chrono::milliseconds(10));
sem_post(&Use);
}
}
void consumer() {
int i=0;
while(i++<10){
sem_wait(&Use);
sem_getvalue(&Empty,&v);
cout<<"use 1 contain("<<v<<")\n";
this_thread::sleep_for(chrono::milliseconds(1));
sem_post(&Empty);
}
}
int main()
{

sem_init(&Empty, 0, 3);
sem_init(&Use, 0, 0);
thread t1(producer),t2(consumer),t3(consumer);
if(t1.joinable()) t1.join();
if(t2.joinable()) t2.join();
if(t3.joinable()) t3.join();

return 0;
}

通信

线程的通信通过全局的共享变量搭配信号量即可完成.需要专门的通信主要是对于进程而言