原文链接:C++ IO多路复用 select模型
介绍
网络通信本质是IO操作.了解网络通信先了解IO
进程的IO主要分为阻塞IO和非阻塞IO,对IO请求的响应分为同步(进程等待IO响应)和异步(进程不等待)
常见的阻塞IO包括操作系统提供的系统调用如读,写. 非阻塞包括多路IO复用等模型
阻塞IO:进程在IO操作时需要挂起等待
非阻塞IO:进程IO操作时立即返回
IO复用:利用操作系统实现对一组IO通信管理
预备知识
了解多路IO复用需要了解 IO 多进程线程知识 以及网络通信知识
参考
fcntl库:IO控制库
C++ Linux多线程同步通信-信号量
C++ Linux多进程Socket通信
select模型
特性
原理
操作系统内核拿到一组文件描述符,将其视作一组IO设备,将设备的管理任务交给IO系统,IO系统监测到设备事件会中断 让内核维护更新文件描述符的状态. 此时用户进程可以阻塞也可以非阻塞
用户进程通过遍历该组文件描述符判断标志,处理对应IO
优点
跨平台,多数操作系统都支持,实现简单
缺点
IO的访问是线性的,每次都要遍历整组IO,效率不高
库函数<sys/select>
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| #include <sys/select.h> #include <sys/time.h> #include <sys/types.h>
select会将每次传入的监听IO集合修改为活动IO集合,也就是说每次传入的fd_set会被修改,因此需要监视的fd_set需要先复制一份,每次循环都要重置
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout); nfds: fds种最大的fd+1 从0访问到nfds 如果小了检测不到超过的fd readfds writefds exceptfds : 读/写/异常的文件描述符集合 不需要管理的传null即可 使用fd_set结构体定义 timeout: 设置查询的时间限制,超过时间直接返回 null表示阻塞 return >0(存在事件的IO数量) 0(超出时限无IO事件) -1(错误)
timeout会返回等待后剩余的时间
struct timeval 结构体可以设置时间,包括s,us struct timeval{ long tv_sec; long tv_usec; }
fd_set集合是通过一组操作进行管理的 void FD_CLR(int fd, fd_set *set); int FD_ISSET(int fd, fd_set *set); return >0(fd在set中) 0(在) void FD_SET(int fd, fd_set *set); void FD_ZERO(fd_set *set);
|
同时还有一个更高级的pselect
1 2 3 4 5 6 7 8 9 10
| #include <sys/select.h> int pselect(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, const struct timespec *timeout, const sigset_t *sigmask);
struct timespec{ time_t tv_sec; long tv_nsec; }
|
主要有两个区别:
1是struct timespec 时间精度更高,并且不更新剩余时间
2是sigmask提供了信号屏蔽功能,可以屏蔽传入的信号集合的中断,实现原子操作
相比select 自然更复杂 但是信号屏蔽功能能够提升通信的稳定性
实例: 实现select监听2个相互通信的进程,转发读写消息.(一个标准的通信转发服务器)
消息读写的问题
- 消息异步处理问题
在网络通信模型中,有个问题在于:虽然消息读出来了,但是接收方此时不一定能写入.
解决方法:
使用消息队列,遍历IO时,将所有可读的消息直接添加到消息队列里(如果消息满了等下轮处理). 如果当前IO可写,从消息队列取出发给该IO的首条消息,没有消息则跳过.
- 消息转发标志问题
由于客户端是通过服务器进行通信的,发件人不知道收件人是谁.
解决方法:
服务器应该维护一个客户端名称映射表,消息中包含发件人和收件人名称. 消息还要包括类型
在首次连接时,客户端发送设置名称的消息,服务器在判断消息类型后,处理名称的合法性,名称合法就加到映射表,通知客户端.
名称不合法就给该客户端发送设置错误的消息.
当然,这种复杂的消息结构和处理不会在实例中实现
- 消息在连接前发送
虽然有映射表实现消息转发,但是如果在收件人还未连接前就发送消息,表里面找不到对应的IO,就无法发送.
解决方法:
在大部分场景中,不会出现给服务器未知的对象发送消息,客户端会在发送消息前确保对象是存在的.(发送消息前必然会先和服务器同步数据)
在本实例中可以等都连接上了再发送消息.
IO的处理
进程维护一个fd_set,使用该fd_set作为备份,创建两个用于循环的rfds和wfds,使用select处理.
for循环遍历io时,使用ISSET(i,&rfds) 和 ISSET(i,&wfds) 分别判断是否可读和可写.
可读就把消息放到队列,可写就把消息写入.
程序
测试的时候有几个问题:
- 没注意到在第二个连接时,rfds没有重置,包括了新连接信号的server_fd,结果导致server_fd被错误处理了
接收数据遍历时,添加一个fd!=server_fd
- 忘了每个client第一条消息是不会被转发的,用于获得id
client都先发送一个数据,用于学习,然后再发送消息.
- 由于项目没有考虑更好的连接配置方案,有个问题在于第一个client1连接时连续发送了两个消息,当client2连接时,服务器本轮select处理的是client2连接,以及client1发送的配置id.
下一轮就处理client1的消息和client2的配置id,处理client1消息在配置id前,导致to的id依旧找不到
解决这个问题比较简单,client发送第二条消息前等几秒,在这几秒之间启动clien1和2,保证id先配置好,然后两者发的消息都能收到
- socket消息发送的时候是按照实际的大小发送,预分配的空间不会占满
执行结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| [server] new connect new connect mapping client 1 with fd: 4 mapping client 2 with fd: 5 push msg from 1 to 2 send msg from 1 to 2 push msg from 2 to 1 send msg from 2 to 1 message is incomplete: Success push msg from 2 to 1 send msg from 2 to 1 message is incomplete: Success push msg from 2 to 1 message is incomplete: Success push msg from 2 to 1
[client1] [1] send id [1] send msg to 2:this is 1, sent to 2 [1] recv msg from 2:this is 2, sent to 1
[client2] [2] send id [2] send msg to 1:this is 2, sent to 1 [2] recv msg from 1:this is 1, sent to 2
|
server.cpp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
| #include<sys/socket.h> #include<arpa/inet.h> #include<sys/select.h> #include<string> #include<unistd.h> #include<unordered_map> #include<queue> #include<iostream> #include <csignal> using namespace std;
int MAX_CONNECT=2; const int MAX_SIZE=100;
struct Message{ int from; int to; char content[MAX_SIZE]; };
int server_fd;
void ExitlHandler(int sig) { close(server_fd); cout<<"server closed\n"; exit(sig); }
int main(){
server_fd=socket(AF_INET,SOCK_STREAM,0); if(server_fd==-1){ perror("socket");exit(EXIT_FAILURE); }
signal(SIGINT, ExitlHandler);
struct sockaddr_in server_addr; string server_ip="127.0.0.1"; uint16_t server_port=8001; server_addr.sin_family=AF_INET; server_addr.sin_port=htons(server_port); inet_pton(AF_INET,server_ip.c_str(),&server_addr.sin_addr.s_addr); if(bind(server_fd,(const struct sockaddr*)&server_addr,sizeof(server_addr))==-1){ perror("bind");exit(EXIT_FAILURE); } if(listen(server_fd,MAX_CONNECT)==-1){ perror("listen"); exit(EXIT_FAILURE); } fd_set confds,rfds,wfds; FD_ZERO(&confds); FD_SET(server_fd,&confds); struct timeval ts; ts.tv_sec=0; ts.tv_usec=0; int max_fd=server_fd; int count=0; queue<Message> q; unordered_map<int,int> id2fd; while(true){ rfds=confds; wfds=confds; int nevent=select(max_fd+1,&rfds,&wfds,nullptr,&ts); if (nevent ==-1) { perror("select");
}else if(nevent==0){ continue; }else; if (FD_ISSET(server_fd, &rfds)) { int client_fd = accept(server_fd, nullptr, nullptr); if (client_fd ==-1) { perror("accept"); } else { FD_SET(client_fd,&confds); max_fd=max(client_fd,max_fd); count++; cout<<"new connect"<<"\n"; } } if(count<2) continue;
for(int fd=0;fd<max_fd+1;fd++){ if(FD_ISSET(fd,&rfds)&&fd!=server_fd){ struct Message new_msg; ssize_t size = recv(fd,(void*)&new_msg,sizeof(new_msg) ,0); if(size==-1){ perror("recv"); }else if(size<sizeof(new_msg)){ perror("message is incomplete"); }else ; if(!id2fd.count(new_msg.from)){ id2fd[new_msg.from]=fd; cout<<"mapping client "<<new_msg.from<<" with fd: "<<fd<<"\n"; }else{ if(!id2fd.count(new_msg.to)){ perror("unknow client:new_msg.to"); }else{ q.push(new_msg); cout<<"push msg from "<<new_msg.from<<" to "<<new_msg.to<<"\n"; } } } if(FD_ISSET(fd,&wfds)){ int n=q.size(); for(int i=0;i<n;i++){ const struct Message m=q.front();q.pop(); if(id2fd[m.to]!=fd){ q.push(m); }else{ ssize_t size =send(fd,(void*)&m,sizeof(m),0); if(size<0){ perror("send"); }else if(size==0){ perror("connect closed"); }else{ cout<<"send msg from "<<m.from<<" to "<<m.to<<"\n"; } break; } } } } } return 0; }
|
client1.cpp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
| #include<sys/socket.h> #include<arpa/inet.h> #include<sys/select.h> #include<cstring> #include<unistd.h> #include<unordered_map> #include<queue> #include<iostream> using namespace std;
const int MAX_SIZE=100;
struct Message{ int from; int to; char content[MAX_SIZE]; };
int main(){ int socket_fd=socket(AF_INET,SOCK_STREAM,0); if(socket_fd==-1){ perror("socket");exit(EXIT_FAILURE); } struct sockaddr_in server_addr; string server_ip="127.0.0.1"; uint16_t server_port=8001; server_addr.sin_family=AF_INET; server_addr.sin_port=htons(server_port); inet_pton(AF_INET,server_ip.c_str(),&server_addr.sin_addr.s_addr); if(connect(socket_fd,(sockaddr*)&server_addr,sizeof(server_addr))==-1){ perror("connect"); exit(EXIT_FAILURE); } ssize_t size; struct Message msg; msg.from=1; msg.to=2; memcpy(msg.content,"this is 1, sent to 2",sizeof("this is 1, sent to 2")); size=send(socket_fd,(void*)&msg,sizeof(msg),0); if(size<0){ perror("send"); }else if(size==0){ perror("connect closed"); }else{ cout<<"[1] send id\n"; }
sleep(4);
size=send(socket_fd,(void*)&msg,sizeof(msg),0); if(size<0){ perror("send"); }else if(size==0){ perror("connect closed"); }else{ cout<<"[1] send msg to 2:"<<msg.content<<"\n"; } size=recv(socket_fd,(void*)&msg,sizeof(msg),0); if(size<0){ perror("recv"); }else if(size==0){ perror("connect closed"); }else{ cout<<"[1] recv msg from "<<msg.from<<":"<<msg.content<<"\n"; } return 0; }
|
client2.cpp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
| #include<sys/socket.h> #include<arpa/inet.h> #include<sys/select.h> #include<cstring> #include<unistd.h> #include<unordered_map> #include<queue> #include<iostream> using namespace std;
const int MAX_SIZE=100;
struct Message{ int from; int to; char content[MAX_SIZE]; };
int main(){ int socket_fd=socket(AF_INET,SOCK_STREAM,0); if(socket_fd==-1){ perror("socket");exit(EXIT_FAILURE); } struct sockaddr_in server_addr; string server_ip="127.0.0.1"; uint16_t server_port=8001; server_addr.sin_family=AF_INET; server_addr.sin_port=htons(server_port); inet_pton(AF_INET,server_ip.c_str(),&server_addr.sin_addr.s_addr); if(connect(socket_fd,(sockaddr*)&server_addr,sizeof(server_addr))==-1){ perror("connect"); exit(EXIT_FAILURE); } ssize_t size; struct Message msg; msg.from=2; msg.to=1; memcpy(msg.content,"this is 2, sent to 1",sizeof("this is 2, sent to 1")); size=send(socket_fd,(void*)&msg,sizeof(msg),0); if(size<0){ perror("send"); }else if(size==0){ perror("connect closed"); }else{ cout<<"[2] send id\n"; }
sleep(4);
size=send(socket_fd,(void*)&msg,sizeof(msg),0); if(size<0){ perror("send"); }else if(size==0){ perror("connect closed"); }else{ cout<<"[2] send msg to 1:"<<msg.content<<"\n"; } size=recv(socket_fd,(void*)&msg,sizeof(msg),0); if(size<0){ perror("recv"); }else if(size==0){ perror("connect closed"); }else{ cout<<"[2] recv msg from "<<msg.from<<":"<<msg.content<<"\n"; } return 0; }
|