原文链接:C++ IO多路复用 select模型
介绍
网络通信本质是IO操作.了解网络通信先了解IO
进程的IO主要分为阻塞IO和非阻塞IO,对IO请求的响应分为同步(进程等待IO响应)和异步(进程不等待)
常见的阻塞IO包括操作系统提供的系统调用如读,写. 非阻塞包括多路IO复用等模型
阻塞IO:进程在IO操作时需要挂起等待
非阻塞IO:进程IO操作时立即返回
IO复用:利用操作系统实现对一组IO通信管理
预备知识
了解多路IO复用需要了解 IO 多进程线程知识 以及网络通信知识
参考
fcntl库:IO控制库
C++ Linux多线程同步通信-信号量
C++ Linux多进程Socket通信
select模型
特性
原理
操作系统内核拿到一组文件描述符,将其视作一组IO设备,将设备的管理任务交给IO系统,IO系统监测到设备事件会中断 让内核维护更新文件描述符的状态. 此时用户进程可以阻塞也可以非阻塞
用户进程通过遍历该组文件描述符判断标志,处理对应IO
优点
跨平台,多数操作系统都支持,实现简单
缺点
IO的访问是线性的,每次都要遍历整组IO,效率不高
库函数<sys/select>
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| #include <sys/select.h> #include <sys/time.h> #include <sys/types.h>
select会将每次传入的监听IO集合修改为活动IO集合,也就是说每次传入的fd_set会被修改,因此需要监视的fd_set需要先复制一份,每次循环都要重置
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout); nfds: fds种最大的fd+1 从0访问到nfds 如果小了检测不到超过的fd readfds writefds exceptfds : 读/写/异常的文件描述符集合 不需要管理的传null即可 使用fd_set结构体定义 timeout: 设置查询的时间限制,超过时间直接返回 null表示阻塞 return >0(存在事件的IO数量) 0(超出时限无IO事件) -1(错误)
timeout会返回等待后剩余的时间
struct timeval 结构体可以设置时间,包括s,us struct timeval{ long tv_sec; long tv_usec; }
fd_set集合是通过一组操作进行管理的 void FD_CLR(int fd, fd_set *set); int FD_ISSET(int fd, fd_set *set); return >0(fd在set中) 0(在) void FD_SET(int fd, fd_set *set); void FD_ZERO(fd_set *set);
|
同时还有一个更高级的pselect
1 2 3 4 5 6 7 8 9 10
| #include <sys/select.h> int pselect(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, const struct timespec *timeout, const sigset_t *sigmask);
struct timespec{ time_t tv_sec; long tv_nsec; }
|
主要有两个区别:
1是struct timespec 时间精度更高,并且不更新剩余时间
2是sigmask提供了信号屏蔽功能,可以屏蔽传入的信号集合的中断,实现原子操作
相比select 自然更复杂 但是信号屏蔽功能能够提升通信的稳定性
实例: 实现select监听2个相互通信的进程,转发读写消息.(一个标准的通信转发服务器)
消息读写的问题
- 消息异步处理问题
在网络通信模型中,有个问题在于:虽然消息读出来了,但是接收方此时不一定能写入.
解决方法:
使用消息队列,遍历IO时,将所有可读的消息直接添加到消息队列里(如果消息满了等下轮处理). 如果当前IO可写,从消息队列取出发给该IO的首条消息,没有消息则跳过.
- 消息转发标志问题
由于客户端是通过服务器进行通信的,发件人不知道收件人是谁.
解决方法:
服务器应该维护一个客户端名称映射表,消息中包含发件人和收件人名称. 消息还要包括类型
在首次连接时,客户端发送设置名称的消息,服务器在判断消息类型后,处理名称的合法性,名称合法就加到映射表,通知客户端.
名称不合法就给该客户端发送设置错误的消息.
当然,这种复杂的消息结构和处理不会在实例中实现
- 消息在连接前发送
虽然有映射表实现消息转发,但是如果在收件人还未连接前就发送消息,表里面找不到对应的IO,就无法发送.
解决方法:
在大部分场景中,不会出现给服务器未知的对象发送消息,客户端会在发送消息前确保对象是存在的.(发送消息前必然会先和服务器同步数据)
在本实例中可以等都连接上了再发送消息.
IO的处理
进程维护一个fd_set,使用该fd_set作为备份,创建两个用于循环的rfds和wfds,使用select处理.
for循环遍历io时,使用ISSET(i,&rfds) 和 ISSET(i,&wfds) 分别判断是否可读和可写.
可读就把消息放到队列,可写就把消息写入.
程序
测试的时候有几个问题:
- 没注意到在第二个连接时,rfds没有重置,包括了新连接信号的server_fd,结果导致server_fd被错误处理了
接收数据遍历时,添加一个fd!=server_fd
- 忘了每个client第一条消息是不会被转发的,用于获得id
client都先发送一个数据,用于学习,然后再发送消息.
- 由于项目没有考虑更好的连接配置方案,有个问题在于第一个client1连接时连续发送了两个消息,当client2连接时,服务器本轮select处理的是client2连接,以及client1发送的配置id.
下一轮就处理client1的消息和client2的配置id,处理client1消息在配置id前,导致to的id依旧找不到
解决这个问题比较简单,client发送第二条消息前等几秒,在这几秒之间启动clien1和2,保证id先配置好,然后两者发的消息都能收到
- socket消息发送的时候是按照实际的大小发送,预分配的空间不会占满
执行结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| [server] new connect new connect mapping client 1 with fd: 4 mapping client 2 with fd: 5 push msg from 1 to 2 send msg from 1 to 2 push msg from 2 to 1 send msg from 2 to 1 message is incomplete: Success push msg from 2 to 1 send msg from 2 to 1 message is incomplete: Success push msg from 2 to 1 message is incomplete: Success push msg from 2 to 1
[client1] [1] send id [1] send msg to 2:this is 1, sent to 2 [1] recv msg from 2:this is 2, sent to 1
[client2] [2] send id [2] send msg to 1:this is 2, sent to 1 [2] recv msg from 1:this is 1, sent to 2
|
server.cpp

| #include<sys/socket.h> #include<arpa/inet.h> #include<sys/select.h> #include<string> #include<unistd.h> #include<unordered_map> #include<queue> #include<iostream> #include <csignal> using namespace std;
int MAX_CONNECT=2; const int MAX_SIZE=100;
struct Message{ int from; int to; char content[MAX_SIZE]; };
int server_fd;
void ExitlHandler(int sig) { close(server_fd); cout<<"server closed\n"; exit(sig); }
int main(){
server_fd=socket(AF_INET,SOCK_STREAM,0); if(server_fd==-1){ perror("socket");exit(EXIT_FAILURE); }
signal(SIGINT, ExitlHandler);
struct sockaddr_in server_addr; string server_ip="127.0.0.1"; uint16_t server_port=8001; server_addr.sin_family=AF_INET; server_addr.sin_port=htons(server_port); inet_pton(AF_INET,server_ip.c_str(),&server_addr.sin_addr.s_addr); if(bind(server_fd,(const struct sockaddr*)&server_addr,sizeof(server_addr))==-1){ perror("bind");exit(EXIT_FAILURE); } if(listen(server_fd,MAX_CONNECT)==-1){ perror("listen"); exit(EXIT_FAILURE); } fd_set confds,rfds,wfds; FD_ZERO(&confds); FD_SET(server_fd,&confds); struct timeval ts; ts.tv_sec=0; ts.tv_usec=0; int max_fd=server_fd; int count=0; queue<Message> q; unordered_map<int,int> id2fd; while(true){ rfds=confds; wfds=confds; int nevent=select(max_fd+1,&rfds,&wfds,nullptr,&ts); if (nevent ==-1) { perror("select");
}else if(nevent==0){ continue; }else; if (FD_ISSET(server_fd, &rfds)) { int client_fd = accept(server_fd, nullptr, nullptr); if (client_fd ==-1) { perror("accept"); } else { FD_SET(client_fd,&confds); max_fd=max(client_fd,max_fd); count++; cout<<"new connect"<<"\n"; } } if(count<2) continue;
for(int fd=0;fd<max_fd+1;fd++){ if(FD_ISSET(fd,&rfds)&&fd!=server_fd){ struct Message new_msg; ssize_t size = recv(fd,(void*)&new_msg,sizeof(new_msg) ,0); if(size==-1){ perror("recv"); }else if(size<sizeof(new_msg)){ perror("message is incomplete"); }else ; if(!id2fd.count(new_msg.from)){ id2fd[new_msg.from]=fd; cout<<"mapping client "<<new_msg.from<<" with fd: "<<fd<<"\n"; }else{ if(!id2fd.count(new_msg.to)){ perror("unknow client:new_msg.to"); }else{ q.push(new_msg); cout<<"push msg from "<<new_msg.from<<" to "<<new_msg.to<<"\n"; } } } if(FD_ISSET(fd,&wfds)){ int n=q.size(); for(int i=0;i<n;i++){ const struct Message m=q.front();q.pop(); if(id2fd[m.to]!=fd){ q.push(m); }else{ ssize_t size =send(fd,(void*)&m,sizeof(m),0); if(size<0){ perror("send"); }else if(size==0){ perror("connect closed"); }else{ cout<<"send msg from "<<m.from<<" to "<<m.to<<"\n"; } break; } } } } } return 0; }
|
client1.cpp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
| #include<sys/socket.h> #include<arpa/inet.h> #include<sys/select.h> #include<cstring> #include<unistd.h> #include<unordered_map> #include<queue> #include<iostream> using namespace std;
const int MAX_SIZE=100;
struct Message{ int from; int to; char content[MAX_SIZE]; };
int main(){ int socket_fd=socket(AF_INET,SOCK_STREAM,0); if(socket_fd==-1){ perror("socket");exit(EXIT_FAILURE); } struct sockaddr_in server_addr; string server_ip="127.0.0.1"; uint16_t server_port=8001; server_addr.sin_family=AF_INET; server_addr.sin_port=htons(server_port); inet_pton(AF_INET,server_ip.c_str(),&server_addr.sin_addr.s_addr); if(connect(socket_fd,(sockaddr*)&server_addr,sizeof(server_addr))==-1){ perror("connect"); exit(EXIT_FAILURE); } ssize_t size; struct Message msg; msg.from=1; msg.to=2; memcpy(msg.content,"this is 1, sent to 2",sizeof("this is 1, sent to 2")); size=send(socket_fd,(void*)&msg,sizeof(msg),0); if(size<0){ perror("send"); }else if(size==0){ perror("connect closed"); }else{ cout<<"[1] send id\n"; }
sleep(4);
size=send(socket_fd,(void*)&msg,sizeof(msg),0); if(size<0){ perror("send"); }else if(size==0){ perror("connect closed"); }else{ cout<<"[1] send msg to 2:"<<msg.content<<"\n"; } size=recv(socket_fd,(void*)&msg,sizeof(msg),0); if(size<0){ perror("recv"); }else if(size==0){ perror("connect closed"); }else{ cout<<"[1] recv msg from "<<msg.from<<":"<<msg.content<<"\n"; } return 0; }
|
client2.cpp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
| #include<sys/socket.h> #include<arpa/inet.h> #include<sys/select.h> #include<cstring> #include<unistd.h> #include<unordered_map> #include<queue> #include<iostream> using namespace std;
const int MAX_SIZE=100;
struct Message{ int from; int to; char content[MAX_SIZE]; };
int main(){ int socket_fd=socket(AF_INET,SOCK_STREAM,0); if(socket_fd==-1){ perror("socket");exit(EXIT_FAILURE); } struct sockaddr_in server_addr; string server_ip="127.0.0.1"; uint16_t server_port=8001; server_addr.sin_family=AF_INET; server_addr.sin_port=htons(server_port); inet_pton(AF_INET,server_ip.c_str(),&server_addr.sin_addr.s_addr); if(connect(socket_fd,(sockaddr*)&server_addr,sizeof(server_addr))==-1){ perror("connect"); exit(EXIT_FAILURE); } ssize_t size; struct Message msg; msg.from=2; msg.to=1; memcpy(msg.content,"this is 2, sent to 1",sizeof("this is 2, sent to 1")); size=send(socket_fd,(void*)&msg,sizeof(msg),0); if(size<0){ perror("send"); }else if(size==0){ perror("connect closed"); }else{ cout<<"[2] send id\n"; }
sleep(4);
size=send(socket_fd,(void*)&msg,sizeof(msg),0); if(size<0){ perror("send"); }else if(size==0){ perror("connect closed"); }else{ cout<<"[2] send msg to 1:"<<msg.content<<"\n"; } size=recv(socket_fd,(void*)&msg,sizeof(msg),0); if(size<0){ perror("recv"); }else if(size==0){ perror("connect closed"); }else{ cout<<"[2] recv msg from "<<msg.from<<":"<<msg.content<<"\n"; } return 0; }
|