0%

C++ IO多路复用 select模型

原文链接:C++ IO多路复用 select模型

介绍

网络通信本质是IO操作.了解网络通信先了解IO

进程的IO主要分为阻塞IO和非阻塞IO,对IO请求的响应分为同步(进程等待IO响应)和异步(进程不等待)

常见的阻塞IO包括操作系统提供的系统调用如读,写. 非阻塞包括多路IO复用等模型

阻塞IO:进程在IO操作时需要挂起等待
非阻塞IO:进程IO操作时立即返回
IO复用:利用操作系统实现对一组IO通信管理

预备知识

了解多路IO复用需要了解 IO 多进程线程知识 以及网络通信知识
参考

fcntl库:IO控制库

C++ Linux多线程同步通信-信号量

C++ Linux多进程Socket通信

select模型

特性

原理

操作系统内核拿到一组文件描述符,将其视作一组IO设备,将设备的管理任务交给IO系统,IO系统监测到设备事件会中断 让内核维护更新文件描述符的状态. 此时用户进程可以阻塞也可以非阻塞

用户进程通过遍历该组文件描述符判断标志,处理对应IO

优点

跨平台,多数操作系统都支持,实现简单

缺点

IO的访问是线性的,每次都要遍历整组IO,效率不高

库函数<sys/select>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>


select会将每次传入的监听IO集合修改为活动IO集合,也就是说每次传入的fd_set会被修改,因此需要监视的fd_set需要先复制一份,每次循环都要重置

int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
nfds: fds种最大的fd+10访问到nfds 如果小了检测不到超过的fd
readfds writefds exceptfds : 读/写/异常的文件描述符集合 不需要管理的传null即可 使用fd_set结构体定义
timeout: 设置查询的时间限制,超过时间直接返回 null表示阻塞
return >0(存在事件的IO数量) 0(超出时限无IO事件) -1(错误)

timeout会返回等待后剩余的时间

struct timeval 结构体可以设置时间,包括s,us
struct timeval{
long tv_sec;//秒
long tv_usec;//微秒
}



fd_set集合是通过一组操作进行管理的
void FD_CLR(int fd, fd_set *set);//从集合中删除fd
int FD_ISSET(int fd, fd_set *set);//集合是否存在fd
return >0(fd在set中) 0(在)
void FD_SET(int fd, fd_set *set);//添加fd
void FD_ZERO(fd_set *set);//清空set内容,将每一位都设置为0;

同时还有一个更高级的pselect

1
2
3
4
5
6
7
8
9
10
#include <sys/select.h>
int pselect(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, const struct timespec *timeout,
const sigset_t *sigmask);

struct timespec{
time_t tv_sec;//秒
long tv_nsec;//纳秒
}

主要有两个区别:

1是struct timespec 时间精度更高,并且不更新剩余时间

2是sigmask提供了信号屏蔽功能,可以屏蔽传入的信号集合的中断,实现原子操作

相比select 自然更复杂 但是信号屏蔽功能能够提升通信的稳定性

实例: 实现select监听2个相互通信的进程,转发读写消息.(一个标准的通信转发服务器)

消息读写的问题
  1. 消息异步处理问题
    在网络通信模型中,有个问题在于:虽然消息读出来了,但是接收方此时不一定能写入.

解决方法:

使用消息队列,遍历IO时,将所有可读的消息直接添加到消息队列里(如果消息满了等下轮处理). 如果当前IO可写,从消息队列取出发给该IO的首条消息,没有消息则跳过.

  1. 消息转发标志问题
    由于客户端是通过服务器进行通信的,发件人不知道收件人是谁.

解决方法:

服务器应该维护一个客户端名称映射表,消息中包含发件人和收件人名称. 消息还要包括类型
在首次连接时,客户端发送设置名称的消息,服务器在判断消息类型后,处理名称的合法性,名称合法就加到映射表,通知客户端.
名称不合法就给该客户端发送设置错误的消息.

当然,这种复杂的消息结构和处理不会在实例中实现

  1. 消息在连接前发送

虽然有映射表实现消息转发,但是如果在收件人还未连接前就发送消息,表里面找不到对应的IO,就无法发送.

解决方法:

在大部分场景中,不会出现给服务器未知的对象发送消息,客户端会在发送消息前确保对象是存在的.(发送消息前必然会先和服务器同步数据)

在本实例中可以等都连接上了再发送消息.

IO的处理

进程维护一个fd_set,使用该fd_set作为备份,创建两个用于循环的rfds和wfds,使用select处理.

for循环遍历io时,使用ISSET(i,&rfds) 和 ISSET(i,&wfds) 分别判断是否可读和可写.

可读就把消息放到队列,可写就把消息写入.

程序

测试的时候有几个问题:

  1. 没注意到在第二个连接时,rfds没有重置,包括了新连接信号的server_fd,结果导致server_fd被错误处理了

接收数据遍历时,添加一个fd!=server_fd

  1. 忘了每个client第一条消息是不会被转发的,用于获得id

client都先发送一个数据,用于学习,然后再发送消息.

  1. 由于项目没有考虑更好的连接配置方案,有个问题在于第一个client1连接时连续发送了两个消息,当client2连接时,服务器本轮select处理的是client2连接,以及client1发送的配置id.
    下一轮就处理client1的消息和client2的配置id,处理client1消息在配置id前,导致to的id依旧找不到

解决这个问题比较简单,client发送第二条消息前等几秒,在这几秒之间启动clien1和2,保证id先配置好,然后两者发的消息都能收到

  1. socket消息发送的时候是按照实际的大小发送,预分配的空间不会占满

执行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
[server]
new connect
new connect
mapping client 1 with fd: 4
mapping client 2 with fd: 5
push msg from 1 to 2
send msg from 1 to 2
push msg from 2 to 1
send msg from 2 to 1
message is incomplete: Success
push msg from 2 to 1
send msg from 2 to 1
message is incomplete: Success
push msg from 2 to 1
message is incomplete: Success
push msg from 2 to 1


[client1]
[1] send id
[1] send msg to 2:this is 1, sent to 2
[1] recv msg from 2:this is 2, sent to 1

[client2]
[2] send id
[2] send msg to 1:this is 2, sent to 1
[2] recv msg from 1:this is 1, sent to 2
server.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
#include<sys/socket.h>
#include<arpa/inet.h>
#include<sys/select.h>
#include<string>
#include<unistd.h>
#include<unordered_map>
#include<queue>
#include<iostream>
#include <csignal>
using namespace std;

int MAX_CONNECT=2;
const int MAX_SIZE=100;

struct Message{
int from;
int to;
char content[MAX_SIZE];
};

int server_fd;

void ExitlHandler(int sig) {
close(server_fd); //server fd
cout<<"server closed\n";
exit(sig); // 退出程序
}

int main(){


//socket
server_fd=socket(AF_INET,SOCK_STREAM,0);
if(server_fd==-1){
perror("socket");exit(EXIT_FAILURE);
}

signal(SIGINT, ExitlHandler);


struct sockaddr_in server_addr;
string server_ip="127.0.0.1";
uint16_t server_port=8001;

server_addr.sin_family=AF_INET;
server_addr.sin_port=htons(server_port);
inet_pton(AF_INET,server_ip.c_str(),&server_addr.sin_addr.s_addr);

if(bind(server_fd,(const struct sockaddr*)&server_addr,sizeof(server_addr))==-1){
perror("bind");exit(EXIT_FAILURE);
}

if(listen(server_fd,MAX_CONNECT)==-1){
perror("listen");
exit(EXIT_FAILURE);
}



//select fd_set
fd_set confds,rfds,wfds;
FD_ZERO(&confds);
FD_SET(server_fd,&confds);

struct timeval ts;
ts.tv_sec=0;
ts.tv_usec=0;

int max_fd=server_fd;
int count=0;


//
queue<Message> q;
unordered_map<int,int> id2fd;

//loop
while(true){
// sleep(0.1);
// cout<<"loop"<<"\n";

rfds=confds;
wfds=confds;
int nevent=select(max_fd+1,&rfds,&wfds,nullptr,&ts);
if (nevent ==-1) {
perror("select");

}else if(nevent==0){
continue;
}else;

// new connect
if (FD_ISSET(server_fd, &rfds)) {
int client_fd = accept(server_fd, nullptr, nullptr);
if (client_fd ==-1) {
perror("accept");
} else {
FD_SET(client_fd,&confds);
max_fd=max(client_fd,max_fd);
count++;
cout<<"new connect"<<"\n";
}
}

//确保都连接上
if(count<2) continue;



for(int fd=0;fd<max_fd+1;fd++){

// recv msg
if(FD_ISSET(fd,&rfds)&&fd!=server_fd){
struct Message new_msg;
ssize_t size = recv(fd,(void*)&new_msg,sizeof(new_msg) ,0);

if(size==-1){
perror("recv");
}else if(size<sizeof(new_msg)){
perror("message is incomplete");
}else ;

//第一条消息作为标志,和交换机自学习类似
if(!id2fd.count(new_msg.from)){
id2fd[new_msg.from]=fd;
cout<<"mapping client "<<new_msg.from<<" with fd: "<<fd<<"\n";
}else{
if(!id2fd.count(new_msg.to)){
perror("unknow client:new_msg.to");
}else{
q.push(new_msg);
cout<<"push msg from "<<new_msg.from<<" to "<<new_msg.to<<"\n";
}
}
}

// send msg
if(FD_ISSET(fd,&wfds)){ //循环遍历队列,找到第一个发送到fd的消息就发送,跳出
int n=q.size();
for(int i=0;i<n;i++){
const struct Message m=q.front();q.pop();

if(id2fd[m.to]!=fd){
q.push(m);
}else{
ssize_t size =send(fd,(void*)&m,sizeof(m),0);
if(size<0){
perror("send");
}else if(size==0){
perror("connect closed");
}else{
cout<<"send msg from "<<m.from<<" to "<<m.to<<"\n";
}
break; // 只发送一条.
}
}
}
}

}

return 0;
}


client1.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
#include<sys/socket.h>
#include<arpa/inet.h>
#include<sys/select.h>
#include<cstring>
#include<unistd.h>
#include<unordered_map>
#include<queue>
#include<iostream>
using namespace std;


const int MAX_SIZE=100;

struct Message{
int from;
int to;
char content[MAX_SIZE];
};


int main(){

//socket
int socket_fd=socket(AF_INET,SOCK_STREAM,0);
if(socket_fd==-1){
perror("socket");exit(EXIT_FAILURE);
}

struct sockaddr_in server_addr;
string server_ip="127.0.0.1";
uint16_t server_port=8001;

server_addr.sin_family=AF_INET;
server_addr.sin_port=htons(server_port);
inet_pton(AF_INET,server_ip.c_str(),&server_addr.sin_addr.s_addr);

if(connect(socket_fd,(sockaddr*)&server_addr,sizeof(server_addr))==-1){
perror("connect");
exit(EXIT_FAILURE);
}


ssize_t size;

struct Message msg;

msg.from=1;
msg.to=2;
memcpy(msg.content,"this is 1, sent to 2",sizeof("this is 1, sent to 2"));

// 第一条消息用于提交id
size=send(socket_fd,(void*)&msg,sizeof(msg),0);
if(size<0){
perror("send");
}else if(size==0){
perror("connect closed");
}else{
cout<<"[1] send id\n";
}

sleep(4);// 等服务器建立好id映射表

// 实际发送数据
size=send(socket_fd,(void*)&msg,sizeof(msg),0);
if(size<0){
perror("send");
}else if(size==0){
perror("connect closed");
}else{
cout<<"[1] send msg to 2:"<<msg.content<<"\n";
}

// recv
size=recv(socket_fd,(void*)&msg,sizeof(msg),0);
if(size<0){
perror("recv");
}else if(size==0){
perror("connect closed");
}else{
cout<<"[1] recv msg from "<<msg.from<<":"<<msg.content<<"\n";
}


return 0;
}


client2.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
#include<sys/socket.h>
#include<arpa/inet.h>
#include<sys/select.h>
#include<cstring>
#include<unistd.h>
#include<unordered_map>
#include<queue>
#include<iostream>
using namespace std;


const int MAX_SIZE=100;

struct Message{
int from;
int to;
char content[MAX_SIZE];
};


int main(){

//socket
int socket_fd=socket(AF_INET,SOCK_STREAM,0);
if(socket_fd==-1){
perror("socket");exit(EXIT_FAILURE);
}

struct sockaddr_in server_addr;
string server_ip="127.0.0.1";
uint16_t server_port=8001;

server_addr.sin_family=AF_INET;
server_addr.sin_port=htons(server_port);
inet_pton(AF_INET,server_ip.c_str(),&server_addr.sin_addr.s_addr);

if(connect(socket_fd,(sockaddr*)&server_addr,sizeof(server_addr))==-1){
perror("connect");
exit(EXIT_FAILURE);
}


ssize_t size;

struct Message msg;

msg.from=2;
msg.to=1;
memcpy(msg.content,"this is 2, sent to 1",sizeof("this is 2, sent to 1"));

// 第一条消息用于提交id
size=send(socket_fd,(void*)&msg,sizeof(msg),0);
if(size<0){
perror("send");
}else if(size==0){
perror("connect closed");
}else{
cout<<"[2] send id\n";
}

sleep(4);// 等服务器建立好id映射表


// send
size=send(socket_fd,(void*)&msg,sizeof(msg),0);
if(size<0){
perror("send");
}else if(size==0){
perror("connect closed");
}else{
cout<<"[2] send msg to 1:"<<msg.content<<"\n";
}

// recv
size=recv(socket_fd,(void*)&msg,sizeof(msg),0);
if(size<0){
perror("recv");
}else if(size==0){
perror("connect closed");
}else{
cout<<"[2] recv msg from "<<msg.from<<":"<<msg.content<<"\n";
}


return 0;
}