0%

C++ IO多路复用 epoll模型

原文链接:C++ IO多路复用 epoll模型

预备知识

epoll模型前置需要了解的可以参考:

IO控制:fcntl库:IO控制库

多线程:C++ Linux多线程同步通信-信号量

socket:C++ Linux多进程Socket通信

select模型:C++ IO多路复用 select模型

poll模型:C++ IO多路复用 poll模型

epoll模型

特性

原理

epoll是多路IO复用一种相比select和poll更高效的模型,

epoll底层一般是用红黑树和链表等结构实现的,使用红黑树对IO插入查找或删除, 当有活动事件时放到链表中

优点

通过红黑树和链表的结合,epoll 能够在高并发的场景下高效地管理IO. 处理大量连接优于select和poll

缺点

epoll数据结构更复杂,在连接数量少时,在维护epoll本身上的开销较大

库函数<sys/epoll.h>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# include <sys/epoll.h>
#include <errno.h>

// epoll实例创建
int epoll_create(int size)
size: 预先期望分配的io数,不是必须的,Linux 2.6.8后该参数没有作用
return >0(对应epoll实例的描述符fd) -1(错误)

// epoll实例创建
int epoll_create1(int flags)
flags: 0无标志 EPOLL_CLOEXEC 标志新进程创建时会自动关闭该epoll的fd
return >0(对应epoll实例的描述符fd) -1(错误)

epoll_create和epoll_create1都需要主动close,但是epoll_create1能控制在子进程中自动关闭

使用epoll_create1相比epoll_create更安全



// epoll实例管理
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
epfd: epoll_create返回的描述符
op: EPOLL_CTL_ADD(添加监听IO) EPOLL_CTL_MOD(修改监听IO) EPOLL_CTL_DEL(删除监听IO)
fd: 对应的IO fd
event: epoll_event结构体,保存事件和数据消息
return 0(成功) -1(错误)
// event结构体
struct epoll_event {
__uint32_t events; /* Epoll 事件 */
epoll_data_t data; /* 用户数据 */
};

// events事件参数:
EPOLLIN 读事件
EPOLLPRI 紧急数据
EPOLLOUT 写事件
EPOLLHUP IO关闭
EPOLLET 边缘触发模式,事件状态变化时才会提醒,后续不提醒,但是可以后续处理
EPOLLONESHOT 更极端的模式,事件只会通知一次(需要ctl重新注册添加),当前epoll轮次未处理,下轮不会返回
EPOLLRDHUP IO半关闭,该IO不会再发送数据,但是还能给其写数据(TCP的半关闭状态)
EPOLLERR IO错误

// 用户数据
typedef union epoll_data
{
void* ptr; //指定与fd相关的用户数据
int fd; //指定事件所从属的目标文件描述符
uint32_t u32;
uint64_t u64;
} epoll_data_t;

用户数据在复杂的通信设计里面很有效,例如作为一个指针,指向存放该IO的相关信息如ID




int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
// epoll_wait会将活动事件放在前面,可以顺序处理
epfd: create的fd
events: 注册事件的数组
maxevents: 最大监听事件数量
timeout: -1(阻塞) 0(非阻塞) >0(时间上限)
return >0(事件数) 0(超时) -1(错误)

实例: epoll实现一个并发测试服务器: 测试(IO复用+线程池)Reactor模型的性能

通信数据结构设计

在这种高并发响应场景中,消息的类型实际上会影响实际处理程序. 为了简化项目,我们统一数据大小16B,包含一个long型和double型数据.
测试端 开m个进程,每个线程池一直发送n次+1数据,最终得到两个和.m(1+n)n/2,服务器完成所有请求后销毁线程池,并验证结构是否正确.

1
2
3
4
struct MessageBodyData{ //16B
long c1;
double c2;
}
线程池

参考 C++实现一个线程池

程序

感觉写的很垃圾,没体现epoll优势,只是在使用线程池,不得不说线程池是真好用.

  1. 很难在测试场景中发起大量的连接,测试也只用了一个连接.
  2. 出现过异常:get result:LongSum=1729343879082347468 DoubleSum=5.0015e+07(correct=50005000 real recv=10001, should recv=10000) 数据包多收了.
  3. 出现过异常:Connection reset by peer,说是并发量太大了,连接被主动断开了,但是后面测试不出来,不知道什么问题.

正常结果:

1
2
3
send finish, error counts=0

get result:LongSum=50005000 DoubleSum=5.0005e+07(correct=50005000 real recv=10000, should recv=10000)
server.cpp: 使用线程池实现一个echo服务器,测试服务器响应能力和稳定性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
// threadpools
#include<thread>
#include<functional>
#include<future>
#include<condition_variable>
#include<memory>
#include<iostream>
#include<mutex>
#include<atomic>
#include<vector>
#include<queue>
#include<type_traits>
#include<cstring>

#include<sys/epoll.h>
#include<sys/socket.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<climits>
#include<unistd.h>

#include<chrono>

#define N_REQUESTS 10000
#define M_PROCESSES 1
#define MAX_CONNS 1
#define TOTAL_REQUESES N_REQUESTS*M_PROCESSES

struct MessageData{ //16B
long c1;
double c2;
};

long LongSum=0;
double DoubleSum=0;

bool finish=false;

class ThreadPools{
public:
ThreadPools(int n_threads): n_threads(n_threads),stop(false){
for(int i=0;i<n_threads;i++){
workers.emplace_back([this](){
while(true){
std::packaged_task<void()> task;
{
std::unique_lock<std::mutex> lock(this->mtx);
this->cv.wait(lock,[this](){return this->stop || !this->tasks.empty();});

if(this->stop && this->tasks.empty()) return ;

task=std::move(this->tasks.front());
this->tasks.pop();
task();
}
//std::cout<<"run a task, "<<"current tasks size="<<tasks.size()<<"\n";
}
});
}
}
~ThreadPools(){
{
std::unique_lock<std::mutex> lock(mtx);
stop=true;
}
cv.notify_all();
for(std::thread &worker : workers){
worker.join();
}
//std::cout<<"thread pools terminated\n";
}

template<typename F>
auto enqueue(F&& f)->std::future<typename std::result_of<F()>::type>;

private:
int n_threads;
std::atomic<bool> stop;
std::vector<std::thread> workers;
std::queue<std::packaged_task<void()>> tasks;
std::condition_variable cv;
std::mutex mtx;
};

template<typename F>
auto ThreadPools::enqueue(F&& f)->std::future<typename std::result_of<F()>::type>{

using return_type=typename std::result_of<F()>::type;

auto task=std::make_shared<std::packaged_task<return_type()>>(std::forward<F>(f));
std::future<return_type> res=task->get_future();

{
std::unique_lock<std::mutex> lock(mtx);
if(stop){
throw std::runtime_error("enqueue on stopped ThreadPool");
}
tasks.emplace([task](){(*task)();});
}
cv.notify_one();
//std::cout<<"push a task, "<<"current tasks size="<<tasks.size()<<"\n";

return res;
}


int main(){


int epoll_fd=epoll_create1(EPOLL_CLOEXEC);
if(epoll_fd==-1){
close(epoll_fd);
perror("epoll_create1");
}


std::string server_ip="127.0.0.1";
uint16_t server_port=8001;

int server_fd=socket(AF_INET,SOCK_STREAM,0);
if(server_fd==-1){
close(epoll_fd);
close(server_fd);
perror("socket");exit(EXIT_FAILURE);
}

struct sockaddr_in server_addr;
server_addr.sin_family=AF_INET;
server_addr.sin_port=htons(server_port);
inet_pton(AF_INET,server_ip.c_str(),(struct sockaddr*)&server_addr.sin_addr.s_addr);

if(bind(server_fd,(const struct sockaddr*)&server_addr,sizeof(server_addr))==-1){
close(epoll_fd);
close(server_fd);
perror("bind");exit(EXIT_FAILURE);
}

if(listen(server_fd,10)==-1){
close(epoll_fd);
close(server_fd);
perror("listen");exit(EXIT_FAILURE);
}


ThreadPools tp(5);

struct epoll_event epoll_fds[MAX_CONNS+1];
struct epoll_event ev;
ev.events=EPOLLIN|EPOLLPRI;
ev.data.fd=server_fd;

if(epoll_ctl(epoll_fd,EPOLL_CTL_ADD,server_fd,&ev)==-1){
close(epoll_fd);
close(server_fd);
perror("epoll_ctl");exit(EXIT_FAILURE);
}

int cnt=0;
auto start = std::chrono::high_resolution_clock::now();
bool wait=false;
while(!finish){
if(wait&&std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()-start).count()>10000){
perror("test out off 1000ms, terminating");
finish=true;
}
if(cnt>=TOTAL_REQUESES){
finish=true;
}
int nevents=epoll_wait(epoll_fd,epoll_fds,MAX_CONNS+1,0);
for(int i=0;i<nevents;i++){
// 新连接
if(epoll_fds[i].data.fd==server_fd){
int new_fd=accept(server_fd,nullptr,nullptr);
if(new_fd==-1){
perror("accept");
}else{
ev.events = EPOLLIN; //只读不写
ev.data.fd = new_fd;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, new_fd, &ev);
}

}else{
struct MessageData msg;
ssize_t size=recv(epoll_fds[i].data.fd,&msg,sizeof(msg),0);
if(size<0){
perror("recv");
}else{
wait=true;
cnt++;
auto lam=std::bind([](struct MessageData msg){LongSum+=msg.c1;DoubleSum+=msg.c2; },msg);
tp.enqueue(lam);
}
}


}
}

close(epoll_fd);
close(server_fd);
std::cout<<"get result:"<<"LongSum="<<LongSum<<" DoubleSum="<<DoubleSum<<
"(correct="<<M_PROCESSES*N_REQUESTS*(N_REQUESTS+1)/2<<" real recv="<<cnt<<", should recv="<<M_PROCESSES*N_REQUESTS<<")\n";
}

client.cpp: 客户端:20个线程循环发送10000条消息请求.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
// threadpools
#include<thread>
#include<functional>
#include<future>
#include<condition_variable>
#include<memory>
#include<iostream>
#include<mutex>
#include<atomic>
#include<vector>
#include<queue>
#include<type_traits>

#include<sys/socket.h>
#include<arpa/inet.h>
#include<climits>
#include<unistd.h>
#include<fcntl.h>
#include<chrono>


#define N_REQUESTS 1000000
#define M_PROCESSES 1

struct MessageData{ //16B
long c1;
double c2;
};



class ThreadPools{
public:
ThreadPools(int n_threads): n_threads(n_threads),stop(false){
for(int i=0;i<n_threads;i++){
workers.emplace_back([this](){
while(true){
std::packaged_task<void()> task;
{
std::unique_lock<std::mutex> lock(this->mtx);
this->cv.wait(lock,[this](){return this->stop || !this->tasks.empty();});

if(this->stop && this->tasks.empty()) return ;

task=std::move(this->tasks.front());
this->tasks.pop();
task();
}
//std::cout<<"run a task, "<<"current tasks size="<<tasks.size()<<"\n";
}
});
}
}
~ThreadPools(){
{
std::unique_lock<std::mutex> lock(mtx);
stop=true;
}
cv.notify_all();
for(std::thread &worker : workers){
worker.join();
}
//std::cout<<"thread pools terminated\n";
}

template<typename F>
auto enqueue(F&& f)->std::future<typename std::result_of<F()>::type>;

private:
int n_threads;
std::atomic<bool> stop;
std::vector<std::thread> workers;
std::queue<std::packaged_task<void()>> tasks;
std::condition_variable cv;
std::mutex mtx;
};

template<typename F>
auto ThreadPools::enqueue(F&& f)->std::future<typename std::result_of<F()>::type>{

using return_type=typename std::result_of<F()>::type;

auto task=std::make_shared<std::packaged_task<return_type()>>(std::forward<F>(f));
std::future<return_type> res=task->get_future();

{
std::unique_lock<std::mutex> lock(mtx);
if(stop){
throw std::runtime_error("enqueue on stopped ThreadPool");
}
tasks.emplace([task](){(*task)();});
}
cv.notify_one();
//std::cout<<"push a task, "<<"current tasks size="<<tasks.size()<<"\n";

return res;
}


int main(){

std::string server_ip="127.0.0.1";
uint16_t server_port=8001;

int server_fd=socket(AF_INET,SOCK_STREAM,0);
if(server_fd==-1){
close(server_fd);
perror("socket");exit(EXIT_FAILURE);
}


struct sockaddr_in server_addr;
server_addr.sin_family=AF_INET;
server_addr.sin_port=htons(server_port);
inet_pton(AF_INET,server_ip.c_str(),(struct sockaddr*)&server_addr.sin_addr.s_addr);

if(connect(server_fd,(sockaddr*)&server_addr,sizeof(server_addr))==-1){
perror("connect");
exit(EXIT_FAILURE);
}

int flags = fcntl(server_fd, F_GETFL, 0);
fcntl(server_fd, F_SETFL, flags | O_NONBLOCK);

ThreadPools tp(20);
int cnt=0;
struct MessageData msg;
for(int i=1;i<N_REQUESTS+1;i++){
msg.c1=(long)i;msg.c2=double(i);
auto req=std::bind([](int i,int fd,int& cnt,struct MessageData msg){
ssize_t size=send(fd,&msg,sizeof(msg),0);
if(size<=0) cnt++;
},i,server_fd,std::ref(cnt),msg);
tp.enqueue(req);
}


std::cout<<"send finish, error counts="<<cnt<<"\n";
}