0%

C++ IO多路复用 poll模型

原文链接:C++ IO多路复用 poll模型

预备知识

poll模型前置需要了解的可以参考:

IO控制:fcntl库:IO控制库

多线程:C++ Linux多线程同步通信-信号量

socket:C++ Linux多进程Socket通信

select模型:C++ IO多路复用 select模型

poll模型

特性

原理

poll是对select的改进,主要与select进行比较

优点
  1. IO无数量限制:select用位图检测IO,存在一个MAX_SIZE,但是poll引入链表解决上限问题
  2. poll将监听集合与响应集合分别设置,不需要每次重置集合
缺点
  1. 没有改变select的线性遍历的轮询方法,连接数越多,耗时越长

库函数<sys/poll>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# include <poll>
#include <errno.h>

int poll ( struct pollfd * fdarray, unsigned int nfds, int timeout);
fdarray: 监听的IO数组
nfds: fds数组的长度
timeout: 0(非阻塞) -1(阻塞) >0(时限)

return >0(存在事件的IO数量) 0(超出时限无IO事件) -1(错误)
错误的errno类型:EBADF(存在无效IO),EFAULT(fds越界),EINTR(收到中断信号),EINVAL(fds超过上限),ENOMEM(内存不足)


struct pollfd {
int fd; // 需要监听的IO
short events; // 等待的事件类型
short revents; // 实际发生的事件类型
} ;

events(监听位标志):
POLLIN 所有可读
POLLRDNORM 正常数据可读
POLLRDBAND 高优先级数据可读 (如缓冲区需要优先读取的数据)
POLLPRI 紧急数据可读 (要求立即读取,如TCP外带指针的控制信息) 四种标志区分用于对读事件有更高区分要求的场景
POLLOUT 所有可写
POLLWRNORM 正常数据可写
POLLWRBAND 高优先级数据可写 (特殊协议里面使用,当该IO优先数据的缓冲区可写时标志)
POLLRDHUP 该IO半关闭,不会再给主机发数据,但是还可以接受数据
revents(返回位标志): 相比events,还增加了3个标志
POLLERR IO错误时返回
POLLHUP IO已经关闭
POLLNVAL IO未打开


实例: 实现非阻塞请求响应形式的一个服务器,提供一个四则运算的API, 客户端同步(要求响应后才能再次请求)

请求响应格式
  1. 请求格式
    以http协议为例,请求包含请求头和请求体 最简单的请求头(本例实现的)包括:请求类型,请求主体的字节长度,是否保持连接,是否编码加密.

请求类型这种枚举表示可以用二进制位表示,减小数据包大小

编码加密我们使用简单的数据+10表示.解码-10即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
struct  RequestData{ //20B
struct RequestHeader; //8B short会对齐
struct RequestBody; //12B
};

struct RequestHeader{
short type; // 按位表示|encode|keep-alive| 两位表示4种情况
int bodylen; //
};

struct RequestBody{
short method; // 按位表示: add|sub|times|divide| 两位表示4种情况
int num1;
int num2;
};
  1. 响应格式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
struct  ResponseData{ //16B
struct ResponseHeader; //8B
struct ResponseBody; //8B
};

struct ResponseHeader{
short type; // 直接返回请求相同类型
int bodylen; // 数据大小
};

struct ResponseBody{
short method; // 直接返回请求相同类型
int result;
};
IO连接的处理

这种请求响应格式的服务,需要考虑IO响应后的处理,如keepalive的需要保持连接,否则直接发送完后立即断开连接(close fd,从fds中删除 TCP断开是4挥手过程,可以等响应被读出后再断开)

问题

实际写的时候有几个问题:

  1. 消息的处理时,IO复用+线程池+线程同步最关键的就是响应集合的结构:如果是哈希表,每个请求必须响应后才能再次请求(因为哈希表key唯一). 如果是队列,可以支持同一个服务器多个请求,但是队列的结构需要设计

  2. 队列消息最大的问题就是消息不能直接查找,当遍历fd时,需要每次遍历队列找到相应的消息. 当按照消息遍历每次也不能定位fd在数组哪里,并且fd不一定立即可写. 哈希表能够通过fd直接定位消息.

  3. 队列结构如何设计实现:fd可写和首个消息定位都为O(1)?

  4. 响应结束要求断开连接怎么实现?因为fds数组idx和fd对应,如果断开连接,fds的对应就变化了,因此哈希表不能用来记录索引和fd对应项.断开连接只能重新移动赋值一个新的fds

程序

实际遇到的bug:

  1. 事件判断的时候错误使用了|符号confds[0].revents|POLLIN

    正确的为&按位与

  2. accept没有错误检测
    添加

  3. 循环开始时要清空所有IO的revent=0

server.cpp:接收请求,创建线程处理,将结果添加哈希表项,等响应时写回,并删除表项,同时对连接数组重新处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
#include<iostream>
#include<cstring>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<sys/poll.h>
#include<unordered_map>
#include<semaphore.h>
#include<thread>
#include<climits>
#include<unistd.h>
using namespace std;

#define SLEEP_TIME 1
#define MAX_CONNECT 4
#define MAX_RUNNING 4


#define REQUESTDATA_SIZE 20
#define REQUESTHEADER_SIZE 8
#define REQUESTBODY_SIZE 12
#define RESPONSEDATA_SIZE 16
#define RESPONSEHEADER_SIZE 8
#define RESPONSEBODY_SIZE 8

#define ENCODE 1
#define ALIVE 2
#define ADD 0
#define SUB 1
#define TIMES 2
#define DIVIDE 3


struct RequestHeader{
short type; // 按位表示|encode|keep-alive| 两位表示4种情况
int bodylen; //
};
struct RequestBody{
short method; // 按位表示: add|sub|times|divide| 两位表示4种情况
int num1;
int num2;
};
struct RequestData{ //20B
struct RequestHeader header; //8B short会对齐
struct RequestBody body; //12B
};


struct ResponseHeader{
short type; // 直接返回请求相同类型
int bodylen; // 数据大小
};
struct ResponseBody{
short method; // 直接返回请求相同类型
int result; // return nullptr if error
};
struct ResponseData{ //16B
struct ResponseHeader header; //8B
struct ResponseBody body; //8B
};




int running_threads=0;

unordered_map<int,struct ResponseData*> responses;

sem_t sem_thread;

// 线程处理请求函数
int processing_func(int fd,RequestData reqdata){
struct ResponseHeader resheader;
struct ResponseBody resbody;
resheader.type=reqdata.header.type;
resheader.bodylen=RESPONSEBODY_SIZE;
resbody.method=reqdata.body.method;
resbody.result=INT_MAX;

struct ResponseData resdata;
resdata.header=resheader;
resdata.body=resbody;

int result=0,num1=(reqdata.body).num1,num2=(reqdata.body).num2;
if(resheader.type&ENCODE){
num1-=10;num2-=10;
}

//处理
if(resbody.method&ADD){
result=num1+num2;
}else if(resbody.method&SUB){
result=num1-num2;
}else if(resbody.method&TIMES){
result=num1*num2;
}else if(resbody.method&DIVIDE){
if(num2==0) result=INT_MAX;
else{
result=num1/num2;
}
}else{
perror("error request body.method");
}

resbody.result=result;

//临界区添加数据
sem_wait(&sem_thread);
if(!responses.count(fd)){
responses[fd]=&resdata;
}else{
perror("responses get request from same fd twice");
}
sem_post(&sem_thread);
return 0;
}


int main(){
//信号量同步
sem_init(&sem_thread, 0, 1);

string server_ip="127.0.0.1";
uint16_t server_port=8001;

int server_fd=socket(AF_INET,SOCK_STREAM,0);
if(server_fd==-1){
perror("socket");exit(EXIT_FAILURE);
}

struct sockaddr_in server_addr;
server_addr.sin_family=AF_INET;
server_addr.sin_port=htons(server_port);
inet_pton(AF_INET,server_ip.c_str(),(struct sockaddr*)&server_addr.sin_addr.s_addr);

if(bind(server_fd,(const struct sockaddr*)&server_addr,sizeof(server_addr))==-1){
perror("bind");exit(EXIT_FAILURE);
}

if(listen(server_fd,10)==-1){
perror("listen");exit(EXIT_FAILURE);
}

struct pollfd confds[MAX_CONNECT+1];
confds[0].fd=server_fd;
confds[0].events=POLLIN;

struct RequestHeader reqheader;
struct RequestBody reqbody;
reqheader.type=0;
reqheader.bodylen=REQUESTBODY_SIZE;
reqbody.method=0;
reqbody.num1=0;
reqbody.num2=0;

struct RequestData reqdata;
reqdata.header=reqheader;
reqdata.body=reqbody;





int cur_cons=1;
while(true){
int nevents=poll(confds,cur_cons,0);
if(nevents==-1){
perror("poll");
}else if(nevents==0){
cout<<"no events"<<"\n";
sleep(SLEEP_TIME);
continue;
}else ;

//添加新连接
if((confds[0].revents|POLLIN)&&cur_cons<MAX_CONNECT+1){
int new_fd=accept(server_fd,nullptr,nullptr);
confds[cur_cons].fd=new_fd;
confds[cur_cons].events=POLLIN|POLLOUT;
confds[cur_cons].revents=0;
cur_cons++;
}

//遍历处理IO事件
for(int i=1;i<cur_cons;i++){
//遍历接受数据
if((confds[i].revents&POLLIN)&&(running_threads<MAX_RUNNING)){
ssize_t size=recv(confds[i].fd,&reqdata,REQUESTDATA_SIZE,0);
if(size<=0){
perror("recv");exit(EXIT_FAILURE);
}else{
thread t(processing_func,confds[i].fd,reqdata);
if(t.joinable()) t.join();
cout<<"recv request: request size=="<<size<<"(REQUESTDATA_SIZE=20)\n";
}
}

//遍历发送数据
if(confds[i].revents&POLLOUT){
int fd=confds[i].fd;
if(responses.count(fd)){
ssize_t size=send(confds[i].fd,responses[fd],RESPONSEDATA_SIZE,0);
if(size<=0){
perror("send");
}else{
cout<<"send response: response size=="<<size<<"(RESPONSEDATA_SIZE=16)\n";
if(!(responses[fd]->header.type&ALIVE)){
if(close(fd)==-1){
perror("close");
}
struct pollfd new_confds[MAX_CONNECT+1];
//删除第i个连接
memcpy(new_confds, confds, i*sizeof(struct pollfd));
memcpy(new_confds + i * sizeof(struct pollfd), confds+(i+1)*sizeof(struct pollfd), (MAX_CONNECT-i-1)*sizeof(struct pollfd));

//删除哈希表数据,连接数-1
sem_wait(&sem_thread);
responses.erase(fd);
sem_post(&sem_thread);
cur_cons--;
}
}
}
}
}
}
return 0;
}
client.cpp 单个进程计算请求,阻塞,等待结果返回输出.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
#include<sys/socket.h>
#include<arpa/inet.h>
#include<cstring>
#include<unistd.h>
#include<iostream>
#include<thread>
#include<unistd.h>
#include<random>
#include<unordered_map>
using namespace std;

#define REQUESTDATA_SIZE 20
#define REQUESTHEADER_SIZE 8
#define REQUESTBODY_SIZE 12
#define RESPONSEDATA_SIZE 16
#define RESPONSEHEADER_SIZE 8
#define RESPONSEBODY_SIZE 8

#define ENCODE 1
#define ALIVE 2
#define ADD 0
#define SUB 1
#define TIMES 2
#define DIVIDE 3


struct RequestHeader{
short type; // 按位表示|encode|keep-alive| 两位表示4种情况
int bodylen; //
};
struct RequestBody{
short method; // 按位表示: add|sub|times|divide| 两位表示4种情况
int num1;
int num2;
};
struct RequestData{ //20B
struct RequestHeader header; //8B short会对齐
struct RequestBody body; //12B
};


struct ResponseHeader{
short type; // 直接返回请求相同类型
int bodylen; // 数据大小
};
struct ResponseBody{
short method; // 直接返回请求相同类型
int result; // return nullptr if error
};
struct ResponseData{ //16B
struct ResponseHeader header; //8B
struct ResponseBody body; //8B
};

unordered_map<int,string> um={
{ADD,"ADD"},
{SUB,"SUB"},
{TIMES,"TIMES"},
{DIVIDE,"DIVIDE"}
};



random_device rd;
mt19937 gen(rd());
uniform_int_distribution<> randtype(0, 2);
uniform_int_distribution<> randmethod(0, 3);
uniform_int_distribution<> randnum(0, 100);


void postfunc(string id,int server_fd,struct sockaddr_in server_addr,struct RequestData reqdata){

if(connect(server_fd,(sockaddr*)&server_addr,sizeof(server_addr))==-1){
perror("connect");
return ;
}

reqdata.header.type=randtype(gen);
reqdata.body.method=randmethod(gen);
reqdata.body.num1=randnum(gen);
reqdata.body.num2=randnum(gen);


if((reqdata.header.type&ENCODE)){
reqdata.body.num1+=10;
reqdata.body.num2+=10;
}

// send
ssize_t size=send(server_fd,(void*)&reqdata,REQUESTDATA_SIZE,0);
if(size<0){
perror("send");
exit(EXIT_FAILURE);
}else if(size==0){
perror("connect closed");
}else{
cout<<"thread"<<id<<" send success"<<"\n";
}


struct ResponseData resdata;


// recv
size=recv(server_fd,(void*)&resdata,sizeof(resdata),0);
if(size<0){
perror("recv");
}else if(size==0){
perror("connect closed");
}else{
cout<<"thread"<<id<<":"<<"[ENCODE="<<(reqdata.header.type&ENCODE)<<" ALIVE="<<(reqdata.header.type&ALIVE)
<<"num1="<<reqdata.body.num1<<" "<<um[reqdata.body.method]<<" num2="<<reqdata.body.num2<<" Result="<<resdata.body.result<<"\n";
}

}



int main(int argc,char* argv){

string id;
if(argc==0){
cout<<"no id args"<<"\n";
}else{
id=argv[0];
}

//socket
int server_fd=socket(AF_INET,SOCK_STREAM,0);
if(server_fd==-1){
perror("socket");exit(EXIT_FAILURE);
}

struct sockaddr_in server_addr;
string server_ip="127.0.0.1";
uint16_t server_port=8001;

server_addr.sin_family=AF_INET;
server_addr.sin_port=htons(server_port);
inet_pton(AF_INET,server_ip.c_str(),&server_addr.sin_addr.s_addr);



struct RequestHeader reqheader;
struct RequestBody reqbody;
reqheader.type=0;
reqheader.bodylen=REQUESTBODY_SIZE;
reqbody.method=0;
reqbody.num1=0;
reqbody.num2=0;

struct RequestData reqdata;
reqdata.header=reqheader;
reqdata.body=reqbody;


postfunc(id,server_fd,server_addr,reqdata);

return 0;
}
multi_client.cpp 10个进程同时请求
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49


#include <iostream>
#include <cstring>
#include <unistd.h>
#include <sys/wait.h>
#include <vector>
#include<filesystem>
using namespace std;

int child_process(const char* program_name,const char* args[]){
pid_t cpid=fork();
if(cpid==-1){
perror("fork");
exit(EXIT_FAILURE);
}else if(cpid>0){
return cpid;
}else{
execve(program_name,const_cast<char* const*>(args),nullptr);
perror("execve");
exit(EXIT_FAILURE);
}

}

int main()
{
char* program_name=(char*)"./c.cpp";
if(!std::filesystem::exists(program_name)){
cout<<"file not exists\n";
exit(EXIT_FAILURE);
}

vector<pid_t> childs(10);

for(int i=0;i<10;i++){
string process_id="process_"+to_string(i);
const char* args[]={program_name,process_id.c_str(),nullptr};
childs[i]=child_process(program_name,args);
}

pid_t cpid;
while((cpid=wait(nullptr))>0){
cout<<"child."<<cpid<<"terminated\n";
}

return 0;
}